java activeMQ消息的发送与接收

刺骨的言语ヽ痛彻心扉 2022-06-08 23:51 488阅读 0赞

java activeMQ消息的发送与接收

  1. activemq是我们经常用到的消息队列之一,比如说速度快,对spring的很好的支持,支持多种协议等等,今天我们就来看一下activeMQ消息的发送与接收的源代码。
  2. 我这里使用了两个配置文件,其实在一个配置文件里面就可以完成发送与接收功能,但是为了方便观察日志,我就使用了两个配置文件。在使用代码之前需要搭建好activeMQ消息队列环境。
  3. 一、代码目录结构
  4. 所建立的工程是maven工程,代码结构如图所示:

20170822163146931

  1. 一、maven配置pom.xml
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>Test</groupId>
  6. <artifactId>Test</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <dependencies>
  9. <dependency>
  10. <groupId>com.alibaba</groupId>
  11. <artifactId>fastjson</artifactId>
  12. <version>1.2.16</version>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.json</groupId>
  16. <artifactId>org.json</artifactId>
  17. <version>chargebee-1.0</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>junit</groupId>
  21. <artifactId>junit</artifactId>
  22. <version>4.12</version>
  23. <scope>test</scope>
  24. </dependency>
  25. <dependency>
  26. <groupId>com.cloudhopper.proxool</groupId>
  27. <artifactId>proxool</artifactId>
  28. <version>0.9.1</version>
  29. <exclusions>
  30. <exclusion>
  31. <artifactId>log4j</artifactId>
  32. <groupId>log4j</groupId>
  33. </exclusion>
  34. </exclusions>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.activemq</groupId>
  38. <artifactId>activemq-all</artifactId>
  39. <version>5.11.0</version>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.springframework</groupId>
  43. <artifactId>spring-jms</artifactId>
  44. <version>4.1.4.RELEASE</version>
  45. </dependency>
  46. <dependency>
  47. <groupId>org.springframework</groupId>
  48. <artifactId>spring-jdbc</artifactId>
  49. <version>4.1.4.RELEASE</version>
  50. </dependency>
  51. <dependency>
  52. <groupId>org.springframework</groupId>
  53. <artifactId>spring-core</artifactId>
  54. <version>4.1.4.RELEASE</version>
  55. </dependency>
  56. <dependency>
  57. <groupId>org.springframework</groupId>
  58. <artifactId>spring-beans</artifactId>
  59. <version>4.1.4.RELEASE</version>
  60. </dependency>
  61. <dependency>
  62. <groupId>org.springframework</groupId>
  63. <artifactId>spring-context</artifactId>
  64. <version>4.1.4.RELEASE</version>
  65. </dependency>
  66. <dependency>
  67. <groupId>org.springframework</groupId>
  68. <artifactId>spring-context-support</artifactId>
  69. <version>4.1.4.RELEASE</version>
  70. </dependency>
  71. <dependency>
  72. <groupId>org.apache.cxf</groupId>
  73. <artifactId>cxf-rt-frontend-jaxws</artifactId>
  74. <version>2.7.3</version>
  75. </dependency>
  76. <dependency>
  77. <groupId>org.apache.cxf</groupId>
  78. <artifactId>cxf-rt-transports-http</artifactId>
  79. <version>2.7.3</version>
  80. </dependency>
  81. <dependency>
  82. <groupId>org.apache.cxf</groupId>
  83. <artifactId>cxf-rt-transports-http-jetty</artifactId>
  84. <version>2.4.5</version>
  85. </dependency>
  86. <dependency>
  87. <groupId>org.slf4j</groupId>
  88. <artifactId>slf4j-api</artifactId>
  89. <version>1.7.5</version>
  90. </dependency>
  91. <dependency>
  92. <groupId>com.h2database</groupId>
  93. <artifactId>h2</artifactId>
  94. <version>1.3.152</version>
  95. </dependency>
  96. <dependency>
  97. <groupId>org.apache.httpcomponents</groupId>
  98. <artifactId>httpclient</artifactId>
  99. <version>4.1.1</version>
  100. </dependency>
  101. <dependency>
  102. <groupId>dom4j</groupId>
  103. <artifactId>dom4j</artifactId>
  104. <version>1.6.1</version>
  105. </dependency>
  106. <dependency>
  107. <groupId>commons-dbutils</groupId>
  108. <artifactId>commons-dbutils</artifactId>
  109. <version>1.3</version>
  110. </dependency>
  111. <dependency>
  112. <groupId>org.freemarker</groupId>
  113. <artifactId>freemarker</artifactId>
  114. <version>2.3.16</version>
  115. </dependency>
  116. <dependency>
  117. <groupId>jaxen</groupId>
  118. <artifactId>jaxen</artifactId>
  119. <version>1.1.1</version>
  120. </dependency>
  121. <dependency>
  122. <groupId>net.sourceforge.saxon</groupId>
  123. <artifactId>saxon</artifactId>
  124. <version>9.1.0.8</version>
  125. </dependency>
  126. <dependency>
  127. <groupId>net.sourceforge.saxon</groupId>
  128. <artifactId>saxon</artifactId>
  129. <version>9.1.0.8</version>
  130. <classifier>xqj</classifier>
  131. </dependency>
  132. <dependency>
  133. <groupId>net.sourceforge.saxon</groupId>
  134. <artifactId>saxon</artifactId>
  135. <version>9.1.0.8</version>
  136. <classifier>xpath</classifier>
  137. </dependency>
  138. <dependency>
  139. <groupId>net.sourceforge.saxon</groupId>
  140. <artifactId>saxon</artifactId>
  141. <version>9.1.0.8</version>
  142. <classifier>xom</classifier>
  143. </dependency>
  144. <dependency>
  145. <groupId>net.sourceforge.saxon</groupId>
  146. <artifactId>saxon</artifactId>
  147. <version>9.1.0.8</version>
  148. <classifier>sql</classifier>
  149. </dependency>
  150. <dependency>
  151. <groupId>net.sourceforge.saxon</groupId>
  152. <artifactId>saxon</artifactId>
  153. <version>9.1.0.8</version>
  154. <classifier>s9api</classifier>
  155. </dependency>
  156. <dependency>
  157. <groupId>net.sourceforge.saxon</groupId>
  158. <artifactId>saxon</artifactId>
  159. <version>9.1.0.8</version>
  160. <classifier>jdom</classifier>
  161. </dependency>
  162. <dependency>
  163. <groupId>net.sourceforge.saxon</groupId>
  164. <artifactId>saxon</artifactId>
  165. <version>9.1.0.8</version>
  166. <classifier>dom4j</classifier>
  167. </dependency>
  168. <dependency>
  169. <groupId>net.sourceforge.saxon</groupId>
  170. <artifactId>saxon</artifactId>
  171. <version>9.1.0.8</version>
  172. <classifier>dom</classifier>
  173. </dependency>
  174. <dependency>
  175. <groupId>xom</groupId>
  176. <artifactId>xom</artifactId>
  177. <version>1.1</version>
  178. </dependency>
  179. <dependency>
  180. <groupId>stax</groupId>
  181. <artifactId>stax</artifactId>
  182. <version>1.2.0</version>
  183. </dependency>
  184. <dependency>
  185. <groupId>org.drools</groupId>
  186. <artifactId>drools-core</artifactId>
  187. <version>5.3.0.Final</version>
  188. </dependency>
  189. <dependency>
  190. <groupId>org.drools</groupId>
  191. <artifactId>drools-compiler</artifactId>
  192. <version>5.3.0.Final</version>
  193. </dependency>
  194. <dependency>
  195. <groupId>commons-io</groupId>
  196. <artifactId>commons-io</artifactId>
  197. <version>2.2</version>
  198. </dependency>
  199. <dependency>
  200. <groupId>javax.xml.bind</groupId>
  201. <artifactId>jsr173_api</artifactId>
  202. <version>1.0</version>
  203. </dependency>
  204. <dependency>
  205. <groupId>org.springframework</groupId>
  206. <artifactId>spring-test</artifactId>
  207. <version>4.1.4.RELEASE</version>
  208. </dependency>
  209. <dependency>
  210. <groupId>corba</groupId>
  211. <artifactId>corba-connect</artifactId>
  212. <version>0.0.2</version>
  213. </dependency>
  214. <dependency>
  215. <groupId>com.ustcinfo.inm.data</groupId>
  216. <artifactId>inm-data-spi</artifactId>
  217. <version>0.0.1-SNAPSHOT</version>
  218. </dependency>
  219. <dependency>
  220. <groupId>com.taobao.metamorphosis</groupId>
  221. <artifactId>metamorphosis-client</artifactId>
  222. <version>1.4.4</version>
  223. </dependency>
  224. <dependency>
  225. <groupId>com.googlecode.aviator</groupId>
  226. <artifactId>aviator</artifactId>
  227. <version>2.2.1</version>
  228. </dependency>
  229. <dependency>
  230. <groupId>com.jcraft</groupId>
  231. <artifactId>jsch</artifactId>
  232. <version>0.1.50</version>
  233. </dependency>
  234. </dependencies>
  235. <build>
  236. <sourceDirectory>src</sourceDirectory>
  237. <plugins>
  238. <plugin>
  239. <artifactId>maven-compiler-plugin</artifactId>
  240. <version>3.3</version>
  241. <configuration>
  242. <source>1.8</source>
  243. <target>1.8</target>
  244. </configuration>
  245. </plugin>
  246. </plugins>
  247. </build>
  248. </project>
  249. 二、LoadUtil.java加载类
  250. package www.activemq.load; import javax.jms.Destination; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; /** * 加载配置文件工具类 * @author Administrator * */ public class LoadUtil { ApplicationContext applicationContext; JmsTemplate template;//jsm对象,可以发送和消费消息 Destination destination;//队列名称 /** * 加载方法 * @param path 文件路径名称 */ public void load(String path){ applicationContext = new ClassPathXmlApplicationContext(path); template = (JmsTemplate) applicationContext.getBean("jmsTemplate"); destination = (Destination) applicationContext.getBean("queueDestination"); this.setApplicationContext(applicationContext); this.setDestination(destination); this.setDestination(destination); } public ApplicationContext getApplicationContext() { return applicationContext; } public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } public JmsTemplate getTemplate() { return template; } public void setTemplate(JmsTemplate template) { this.template = template; } public Destination getDestination() { return destination; } public void setDestination(Destination destination) { this.destination = destination; } }
  251. 三、QueueProducer.java发送消息到activeMQ
  252. package www.avtivemq.sendmessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import www.activemq.load.LoadUtil; public class QueueProducer{ /** * 发送消息到activemq的实现方法 * @param msg //发送消息的内容,为字符串类型 */ public void sendMessage(String msg) { LoadUtil lu = new LoadUtil(); lu.load("activemq-config.xml"); JmsTemplate template = lu.getTemplate(); String destination = template.getDefaultDestination().toString(); System.out.println("向队列" + destination + "发送了消息:" + msg); template.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } /** * 主方法 * @param args */ public static void main(String[] args) { QueueProducer producer = new QueueProducer(); producer.sendMessage("zcinfo_test"); } }
  253. 四、activemq-config.xml配置文件
  254. <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!--配置连接工厂地址--> <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.0.81.80:8765"></property> <!-- <property name="brokerURL" value="failover:(tcp://tcp://192.0.81.83:8765,tcp://tcp://192.0.81.84:8765,tcp://tcp://192.0.81.85:8765)"></property> --> </bean> <!--配置队列名称 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>UEAP_TO_GZZC_QUEUE_TEST</value> </constructor-arg> </bean> <!-- 配置JMS模版,这是spring提供的 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="activeMQConnectionFactory"></property> <property name="defaultDestination" ref="queueDestination"></property> <property name="receiveTimeout" value="10000"></property> </bean> </beans>
  255. 五、ActivemqConsumer.java消费类
  256. package www.activemq.receivemessage;
  257. import javax.jms.Message;
  258. import javax.jms.MessageListener;
  259. import javax.jms.TextMessage;
  260. import www.activemq.load.LoadUtil;
  261. public class ActivemqConsumer implements MessageListener{
  262. @Override
  263. public void onMessage(Message message) {
  264. TextMessage tm = (TextMessage)message;
  265. System.out.println("监听到MQ中有数据......");
  266. try {
  267. System.out.println("获取MQ中数据信息>>>>>>>>>>" + tm.getText());
  268. }catch (Exception e) {
  269. e.printStackTrace();
  270. }
  271. }
  272. /**
  273. * 加载配置文件之后监听器会自动调用onMessage方法,并且保持服务一直开启很实用
  274. * @param args
  275. */
  276. public static void main(String[] args) {
  277. new LoadUtil().load("activemq-context.xml");
  278. }
  279. }
  280. 六、activemq-context.xml配置文件
  281. <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"> <context:component-scan base-package="com.starit.analyse" /> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- <property name="brokerURL" value="failover:(tcp://192.0.81.83:8765:8400,tcp://192.0.81.84:8765,tcp://192.0.81.85:8765)"/> --> <property name="brokerURL" value="failover:(tcp://192.0.81.80:8765)?jms.prefetchPolicy.all=1"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--这个是队列目的地--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>UEAP_TO_GZZC_QUEUE_TEST</value> </constructor-arg> </bean> <!-- 消息监听器 加载此处会自动调用监听方法并且一直保持服务开启,很实用 --> <bean id="consumerMessageListener" class="www.activemq.receivemessage.ActivemqConsumer"/> <!-- 消息监听容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> <property name="concurrentConsumers" value="1"/> </bean> </beans>
  282. 七、测试结果
  283. 1)发送消息

20170822171033778

  1. 2activemq显示收到消息

20170822171041983

  1. 3)接收消息,服务一直开启接收发送者消息

20170822171048584

发表评论

表情:
评论列表 (有 0 条评论,488人围观)

还没有评论,来说两句吧...

相关阅读