ActiveMQ环境搭建及实例详解

雨点打透心脏的1/2处 2022-06-05 03:26 746阅读 0赞

ActiveMQ环境搭建及实例详解

前言:
JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。而ActiveMQ是这个规范的一个具体实现。

ActiveMQ是什么?:ActiveMQ是一个消息中间件的一种
ActiveMQ干什么?:他提供标准的产生、发送、接收消息接口
ActiveMQ怎么用?:JAVA开发的时候就可以直接实现相关的接口,完成JAVA应用程序之间的消息发送。

1、ActiveMQ安装部署(Windows)

安装包下载链接:
http://pan.baidu.com/s/1dF71zGH
密码:4tv3

这是一个免安装版的,解压后,双击下图的文件,就可以启动ActiveMQ这个消息中间件)(相当于一个服务器),当我们编辑好相关接口的实现类,发送消息,就能实时接收到消息了:

这里写图片描述

启动后的界面:
这里写图片描述

代码详情:

  1. wrapper | --> Wrapper Started as Console
  2. wrapper | Launching a JVM...
  3. jvm 1 | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.org
  4. jvm 1 | Copyright 1999-2006 Tanuki Software, Inc. All Rights Reserved.
  5. jvm 1 |
  6. jvm 1 | Java Runtime: Oracle Corporation 1.8.0_144 C:\Program Files\Java\jre1.8.0_144
  7. jvm 1 | Heap sizes: current=251392k free=235660k max=932352k
  8. jvm 1 | JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=../../conf/broker.ks -Djavax.net.ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path=../../bin/win64 -Dwrapper.key=5q7ouaKXTUiBhUBy -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=16200 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1
  9. jvm 1 | Extensions classpath:
  10. jvm 1 | [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra]
  11. jvm 1 | ACTIVEMQ_HOME: ..\..
  12. jvm 1 | ACTIVEMQ_BASE: ..\..
  13. jvm 1 | ACTIVEMQ_CONF: ..\..\conf
  14. jvm 1 | ACTIVEMQ_DATA: ..\..\data
  15. jvm 1 | Loading message broker from: xbean:activemq.xml
  16. jvm 1 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@5ea3ef68: startup date [Thu Nov 16 17:38:15 CST 2017]; root of context hierarchy
  17. jvm 1 | INFO | PListStore:[D:\apache-activemq-5.11.2\bin\win64\..\..\data\localhost\tmp_storage] started
  18. jvm 1 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\apache-activemq-5.11.2\bin\win64\..\..\data\kahadb]
  19. jvm 1 | INFO | KahaDB is version 5
  20. jvm 1 | INFO | Recovering from the journal ...
  21. jvm 1 | INFO | Recovery replayed 5 operations from the journal in 0.022 seconds.
  22. jvm 1 | INFO | Apache ActiveMQ 5.11.2 (localhost, ID:DESKTOP-TKN3IRE-49495-1510825096880-0:1) is starting
  23. jvm 1 | INFO | Listening for connections at: tcp://DESKTOP-TKN3IRE:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
  24. jvm 1 | INFO | Connector openwire started
  25. jvm 1 | INFO | Listening for connections at: amqp://DESKTOP-TKN3IRE:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600
  26. jvm 1 | INFO | Connector amqp started
  27. jvm 1 | INFO | Listening for connections at: stomp://DESKTOP-TKN3IRE:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600
  28. jvm 1 | INFO | Connector stomp started
  29. jvm 1 | INFO | Listening for connections at: mqtt://DESKTOP-TKN3IRE:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600
  30. jvm 1 | INFO | Connector mqtt started
  31. jvm 1 | {}
  32. jvm 1 | INFO | Listening for connections at ws://DESKTOP-TKN3IRE:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
  33. jvm 1 | INFO | Connector ws started
  34. jvm 1 | INFO | Apache ActiveMQ 5.11.2 (localhost, ID:DESKTOP-TKN3IRE-49495-1510825096880-0:1) started
  35. jvm 1 | INFO | For help or more information please see: http://activemq.apache.org
  36. jvm 1 | INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/
  37. jvm 1 | INFO | Initializing Spring FrameworkServlet 'dispatcher'
  38. jvm 1 | INFO | jolokia-agent: No access restrictor found at classpath:/jolokia-access.xml, access to all MBeans is allowed

注:请读者自己解析打印的信息,很重要,也不难。

1.1、ActiveMQ控制台界面

  1. ActiveMQ默认启动到8161端口,启动完了后在浏览器地址栏输入:[http://localhost:8161/admin要求输入用户名密码,默认用户名密码为admin、admin,这个用户名密码是在][http_localhost_8161_admin_admin_admin]**conf/users.properties**中配置的。输入用户名密码后便可看到如下图的ActiveMQ控制台界面了。

这里写图片描述

1.2、ActiveMQ目录结构

这里写图片描述

目录结构解析:
bin:存放的是脚本文件
conf:存放的是基本配置文件
data:存放的是日志文件
docs:存放的是说明文档
examples:存放的是简单的实例
lib:存放的是activemq所需jar包
webapps:用于存放项目的目录

2、ActiveMQ模型

  1. 前面我们已经搭建和配置好了ActiveMQ,下面来看一个Demo,体验一下MQJMS消息服务应用程序结构支持两种模型:点对点模型,发布者/订阅者模型。

2.1、点对点模型(Queue)

一个生产者向一个特定的队列发布消息,一个消费者从这个队列中依次读取消息。
 模型特点:只有一个消费者获得消息。
 
这里写图片描述

2.2、发布者/订阅者模型(Topic)

0个或多个订阅者可以接受特定主题的消息。
 模型特点:多个消费者可获得消息。
 
 这里写图片描述  
    
Topic和Queue的最大区别在于Topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而Queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

JMS五种消息格式:
MapMessage :key-value键值对
TextMessage :字符串对象
ObjcetMessage : 一个序列化的Java对象
ByteMessage :一个未解释字节的数据流
StreamMessage : Java原始值的数据流

2.3、ActiveMQ开发流程

  1. ![这里写图片描述][SouthEast 6]

1、根据用户名、密码、连接地址获取ActiveMQConnectionFactory对象
2、由连接工厂获取创建连接Connection
4、由连接对象创建会话Session(重要)
5、由会话对象Session根据实际需要创建消息目的地Destination、消息生产者MessageProducer、消息消费者MessageConsumer、消息格式类型(如:TextMessage)
6、发送消息: messageProducer.send(message);
7、接受消息:messageConsumer.receive(100000);

3、ActiveMQ使用之Queue(点对点消息实现)

  1. 如下图,创建一个JAVA项目,引入activemq-all-5.11.1.jar包:
  2. ![这里写图片描述][SouthEast 7]

3.1、创建消息生产者(Queue):

  1. package Producer;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageProducer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnection;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. /**
  12. * 消息生产者
  13. * @author yang
  14. *
  15. */
  16. public class JMSProducer {
  17. private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
  18. private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
  19. private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
  20. private static final int SENDNUM=10; // 发送的消息数量
  21. /**
  22. * 发送消息 静态类
  23. * @param session 会话对象,创建本次会话的基本配置
  24. * @param messageProducer 消息生产者对象,将本次消息发送给Queue
  25. * @throws Exception
  26. */
  27. public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
  28. for(int i=0;i<JMSProducer.SENDNUM;i++){
  29. //创建消息的格式(以这种格式发送): TextMessage
  30. TextMessage message=session.createTextMessage("ActiveMQ 发送过来的消息"+i+"第二次");
  31. //后台打印
  32. System.out.println("发送消息:"+"ActiveMQ 发送过来的消息"+i+"第二次");
  33. //将消息发送给Queue
  34. messageProducer.send(message);
  35. }
  36. }
  37. public static void main(String[] args) {
  38. ConnectionFactory connectionFactory; // 连接工厂
  39. Connection connection = null; // 连接
  40. Session session; // 会话 接受或者发送消息的线程
  41. Destination destination; // 消息的目的地
  42. MessageProducer messageProducer; // 消息生产者
  43. // 实例化连接工厂
  44. connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
  45. try {
  46. connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
  47. connection.start(); // 启动连接
  48. session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
  49. destination=session.createQueue("FirstQueue1"); // 创建消息队列
  50. messageProducer=session.createProducer(destination); // 创建消息生产者
  51. // 发送消息 ,本质上是将TextMessage类型的消息发送给Queue这个消息队列,这是消息的目的地
  52. sendMessage(session, messageProducer);
  53. session.commit();
  54. } catch (Exception e) {
  55. e.printStackTrace();
  56. } finally{
  57. if(connection!=null){
  58. try {
  59. connection.close();
  60. } catch (JMSException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. }
  65. }
  66. }

3.2、消息接收者:

  1. package Consumer;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageConsumer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnection;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. /**
  12. * 消息消费者
  13. * @author yang
  14. *
  15. */
  16. public class JMSConsumer {
  17. private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
  18. private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
  19. private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
  20. public static void main(String[] args) {
  21. ConnectionFactory connectionFactory; // 连接工厂
  22. Connection connection = null; // 连接
  23. Session session; // 会话 接受或者发送消息的线程
  24. Destination destination; // 消息的目的地
  25. MessageConsumer messageConsumer; // 消息的消费者
  26. // 实例化连接工厂
  27. connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
  28. try {
  29. connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
  30. connection.start(); // 启动连接
  31. // 创建Session
  32. session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
  33. // 创建连接的消息队列 (我们消费者要去哪个队列上读取消息)
  34. destination=session.createQueue("FirstQueue1");
  35. // 创建消息消费者
  36. messageConsumer=session.createConsumer(destination);
  37. while(true){
  38. //取出队列中 TextMessage格式的消息
  39. TextMessage textMessage=(TextMessage)messageConsumer.receive(100000);
  40. if(textMessage!=null){
  41. System.out.println("收到的消息:"+textMessage.getText());//消息格式转化+打印
  42. }else{
  43. break;
  44. }
  45. }
  46. } catch (JMSException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. }

3.3、启动ActiveMQ

3.3.1、双击下面图标:

这里写图片描述

3.3.2、出现下面的界面:

这里写图片描述

*表示ActiveMQ的队列(Queue)开始工作,能够接受来自生产者的消息,消费者能够从队列中获得消息。

3.3.2、启动消息生产者:

这里写图片描述
*消息已经发送到Queue中

3.3.2、启动消息消费者:

这里写图片描述

4、ActiveMQ使用之Queue(引入监听点对点消息)

  1. 使用上面的方式有一个弊端:需要,每次操作消费者才能收到消息,使用监听后,消费者实时监听Queue,当生产者发送消息时,就能接受到消息。

4.1、创建一个监听对象

  1. package Listener;
  2. import javax.jms.JMSException;
  3. import javax.jms.Message;
  4. import javax.jms.MessageListener;
  5. import javax.jms.TextMessage;
  6. /**
  7. * 监听器类:
  8. * @author yang
  9. *
  10. */
  11. public class Listener implements MessageListener {
  12. @Override
  13. public void onMessage(Message message) {
  14. try {
  15. System.out.println("收到的消息:"+((TextMessage)message).getText());
  16. } catch (JMSException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }

4.2、创建一个消费者,引入监听对象

  1. package Consumer;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageConsumer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnection;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. import Listener.Listener;
  12. /**
  13. * 消息消费者中注册监听器:
  14. * @author yang
  15. *
  16. */
  17. public class JMSConsumer2 {
  18. private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
  19. private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
  20. private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
  21. public static void main(String[] args) {
  22. ConnectionFactory connectionFactory; // 连接工厂
  23. Connection connection = null; // 连接
  24. Session session; // 会话 接受或者发送消息的线程
  25. Destination destination; // 消息的目的地
  26. MessageConsumer messageConsumer; // 消息的消费者
  27. // 实例化连接工厂
  28. connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
  29. try {
  30. connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
  31. connection.start(); // 启动连接
  32. session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
  33. destination=session.createQueue("FirstQueue1"); // 创建连接的消息队列
  34. messageConsumer=session.createConsumer(destination); // 创建消息消费者
  35. messageConsumer.setMessageListener(new Listener()); // 注册消息监听
  36. } catch (JMSException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }

4.3、启动消费者,实时监听Queue

这里写图片描述

4.4、多次启动生产者,发送消息

这里写图片描述

4.5、消费者自动获得消息

这里写图片描述

4.5、消费者第二次自动获得消息

这里写图片描述

这里写图片描述

5、ActiveMQ 使用之Topic(发布-订阅消息模式实现)

  1. 发布订阅模式:发布者发布消息后,只要订阅了该发布者的订阅者,都能够接受到数据,因此,要想接收数据,必须先要订阅。一个生产者发布对应多个消费者订阅。
  2. 接着上面的项目,我们再新增一个监听器,一个消费者,项目如下图所示:

这里写图片描述

###5.1、消息生产者(Topic)

  1. package Producer;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageProducer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnection;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. /**
  12. * 消息生产者
  13. * @author yang
  14. *
  15. */
  16. public class JMSProducer2 {
  17. private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
  18. private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
  19. private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
  20. private static final int SENDNUM=10; // 发送的消息数量
  21. /**
  22. * 发送消息 静态类
  23. * @param session 会话对象,创建本次会话的基本配置
  24. * @param messageProducer 消息生产者对象,将本次消息发送给Queue
  25. * @throws Exception
  26. */
  27. public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
  28. for(int i=0;i<JMSProducer2.SENDNUM;i++){
  29. //创建消息的格式(以这种格式发送): TextMessage
  30. TextMessage message=session.createTextMessage("ActiveMQ 发送过来的消息"+i+"第三次");
  31. //后台打印
  32. System.out.println("发送消息:"+"ActiveMQ 发送过来的消息"+i+"第三次");
  33. //将消息发送给Queue
  34. messageProducer.send(message);
  35. }
  36. }
  37. public static void main(String[] args) {
  38. ConnectionFactory connectionFactory; // 连接工厂
  39. Connection connection = null; // 连接
  40. Session session; // 会话 接受或者发送消息的线程
  41. Destination destination; // 消息的目的地
  42. MessageProducer messageProducer; // 消息生产者
  43. // 实例化连接工厂
  44. connectionFactory=new ActiveMQConnectionFactory(JMSProducer2.USERNAME, JMSProducer2.PASSWORD, JMSProducer2.BROKEURL);
  45. try {
  46. connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
  47. connection.start(); // 启动连接
  48. session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
  49. destination=session.createTopic("FirstTopic1");//创建消息发布
  50. messageProducer=session.createProducer(destination); // 创建消息生产者
  51. // 发送消息 ,本质上是将TextMessage类型的消息发送给Queue这个消息队列,这是消息的目的地
  52. sendMessage(session, messageProducer);
  53. session.commit();
  54. } catch (Exception e) {
  55. e.printStackTrace();
  56. } finally{
  57. if(connection!=null){
  58. try {
  59. connection.close();
  60. } catch (JMSException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. }
  65. }
  66. }

5.2、创建二个监听

  1. package Listener;
  2. import javax.jms.JMSException;
  3. import javax.jms.Message;
  4. import javax.jms.MessageListener;
  5. import javax.jms.TextMessage;
  6. /**
  7. * 监听器类:
  8. * @author yang
  9. *
  10. */
  11. public class Listener implements MessageListener {
  12. @Override
  13. public void onMessage(Message message) {
  14. try {
  15. System.out.println("第一个消费者收到的消息:"+((TextMessage)message).getText());
  16. } catch (JMSException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. package Listener;
  22. import javax.jms.JMSException;
  23. import javax.jms.Message;
  24. import javax.jms.MessageListener;
  25. import javax.jms.TextMessage;
  26. /**
  27. * 监听器类:
  28. * @author yang
  29. *
  30. */
  31. public class Listener2 implements MessageListener {
  32. @Override
  33. public void onMessage(Message message) {
  34. try {
  35. System.out.println("第二个消费者收到的消息:"+((TextMessage)message).getText());
  36. } catch (JMSException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }

5.3、创建二个消费者

  1. package Consumer;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageConsumer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnection;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. import Listener.Listener;
  12. /**
  13. * 消息消费者中注册监听器:
  14. * @author yang
  15. *
  16. */
  17. public class JMSConsumer2 {
  18. private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
  19. private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
  20. private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
  21. public static void main(String[] args) {
  22. ConnectionFactory connectionFactory; // 连接工厂
  23. Connection connection = null; // 连接
  24. Session session; // 会话 接受或者发送消息的线程
  25. Destination destination; // 消息的目的地
  26. MessageConsumer messageConsumer; // 消息的消费者
  27. // 实例化连接工厂
  28. connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
  29. try {
  30. connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
  31. connection.start(); // 启动连接
  32. session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
  33. //destination=session.createQueue("FirstQueue1"); // 创建连接的消息队列
  34. destination=session.createTopic("FirstTopic1"); //创建连接的消息主题
  35. messageConsumer=session.createConsumer(destination); // 创建消息消费者
  36. messageConsumer.setMessageListener(new Listener()); // 注册消息监听
  37. } catch (JMSException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. }
  42. package Consumer;
  43. import javax.jms.Connection;
  44. import javax.jms.ConnectionFactory;
  45. import javax.jms.Destination;
  46. import javax.jms.JMSException;
  47. import javax.jms.MessageConsumer;
  48. import javax.jms.Session;
  49. import javax.jms.TextMessage;
  50. import org.apache.activemq.ActiveMQConnection;
  51. import org.apache.activemq.ActiveMQConnectionFactory;
  52. import Listener.Listener;
  53. import Listener.Listener2;
  54. /**
  55. * 消息消费者中注册监听器:
  56. * @author yang
  57. *
  58. */
  59. public class JMSConsumer3 {
  60. private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
  61. private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
  62. private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
  63. public static void main(String[] args) {
  64. ConnectionFactory connectionFactory; // 连接工厂
  65. Connection connection = null; // 连接
  66. Session session; // 会话 接受或者发送消息的线程
  67. Destination destination; // 消息的目的地
  68. MessageConsumer messageConsumer; // 消息的消费者
  69. // 实例化连接工厂
  70. connectionFactory=new ActiveMQConnectionFactory(JMSConsumer3.USERNAME, JMSConsumer3.PASSWORD, JMSConsumer3.BROKEURL);
  71. try {
  72. connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
  73. connection.start(); // 启动连接
  74. session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
  75. //destination=session.createQueue("FirstQueue1"); // 创建连接的消息队列
  76. destination=session.createTopic("FirstTopic1"); //创建连接的消息主题
  77. messageConsumer=session.createConsumer(destination); // 创建消息消费者
  78. messageConsumer.setMessageListener(new Listener2()); // 注册消息监听
  79. } catch (JMSException e) {
  80. e.printStackTrace();
  81. }
  82. }
  83. }

先启动两个消费者,在启动生产者就能获得消息。

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

附源码下载(打包好了,导入就可以使用):ActiveMQ环境搭建及实例详解的源码

开发环境:
window:64位
JDK:1.8

发表评论

表情:
评论列表 (有 0 条评论,746人围观)

还没有评论,来说两句吧...

相关阅读