ActiveMQ实战(三)--ActiveMQ的通信方式之主题发布订阅式(publish-subscribe)

心已赠人 2022-06-12 12:07 408阅读 0赞

一、简介

发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

发布订阅模式有点类似于我们日常生活中订阅报纸。每年到年尾的时候,邮局就会发一本报纸集合让我们来选择订阅哪一个。在这个表里头列了所有出版发行的报纸,那么对于我们每一个订阅者来说,我们可以选择一份或者多份报纸。比如北京日报、潇湘晨报等。那么这些个我们订阅的报纸,就相当于发布订阅模式里的topic。有很多个人订阅报纸,也有人可能和我订阅了相同的报纸。那么,在这里,相当于我们在同一个topic里注册了。对于一份报纸发行方来说,它和所有的订阅者就构成了一个1对多的关系。这种关系如下图所示:

20170722212708095

注意: Subscriber只有在订阅了主题Topic后,才能接收到Publisher发送的消息。

二、实战主题发布订阅式通信实战

首先我们必须创建的是publisher,然后才是subscribe:

1)Publisher

创建两个消息订阅者:

JMSConsumer1

  1. package com.fendo.mq.publishsubscribe;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageConsumer;
  7. import javax.jms.Session;
  8. import org.apache.activemq.ActiveMQConnection;
  9. import org.apache.activemq.ActiveMQConnectionFactory;
  10. /**
  11. * 消息消费者-消息订阅者一
  12. * @author fendo
  13. *
  14. */
  15. public class JMSConsumer1 {
  16. private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
  17. private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
  18. private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
  19. public static void main(String[] args) {
  20. ConnectionFactory connectionFactory; // 连接工厂
  21. Connection connection = null; // 连接
  22. Session session; // 会话 接受或者发送消息的线程
  23. Destination destination; // 消息的目的地
  24. MessageConsumer messageConsumer; // 消息的消费者
  25. // 实例化连接工厂
  26. connectionFactory=new ActiveMQConnectionFactory(JMSConsumer1.USERNAME, JMSConsumer1.PASSWORD, JMSConsumer1.BROKEURL);
  27. try {
  28. connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
  29. connection.start(); // 启动连接
  30. session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
  31. destination=session.createTopic("FirstTopic1");
  32. messageConsumer=session.createConsumer(destination); // 创建消息消费者
  33. messageConsumer.setMessageListener(new Listener1()); // 注册消息监听
  34. } catch (JMSException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. }

JMSConsumer2

  1. package com.fendo.mq.publishsubscribe;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageConsumer;
  7. import javax.jms.Session;
  8. import org.apache.activemq.ActiveMQConnection;
  9. import org.apache.activemq.ActiveMQConnectionFactory;
  10. /**
  11. * 消息消费者-消息订阅者二
  12. * @author fendo
  13. *
  14. */
  15. public class JMSConsumer2 {
  16. private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
  17. private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
  18. private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
  19. public static void main(String[] args) {
  20. ConnectionFactory connectionFactory; // 连接工厂
  21. Connection connection = null; // 连接
  22. Session session; // 会话 接受或者发送消息的线程
  23. Destination destination; // 消息的目的地
  24. MessageConsumer messageConsumer; // 消息的消费者
  25. // 实例化连接工厂
  26. connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
  27. try {
  28. connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
  29. connection.start(); // 启动连接
  30. session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
  31. destination=session.createTopic("FirstTopic1");
  32. messageConsumer=session.createConsumer(destination); // 创建消息消费者
  33. messageConsumer.setMessageListener(new Listener2()); // 注册消息监听
  34. } catch (JMSException e) {
  35. // TODO Auto-generated catch block
  36. e.printStackTrace();
  37. }
  38. }
  39. }

2)subscribe

JMSProducer

  1. package com.fendo.mq.publishsubscribe;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageProducer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnection;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. /**
  12. * 消息生产者-消息发布者
  13. * @author fendo
  14. *
  15. */
  16. public class JMSProducer {
  17. private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
  18. private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
  19. private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
  20. private static final int SENDNUM=10; // 发送的消息数量
  21. public static void main(String[] args) {
  22. ConnectionFactory connectionFactory; // 连接工厂
  23. Connection connection = null; // 连接
  24. Session session; // 会话 接受或者发送消息的线程
  25. Destination destination; // 消息的目的地
  26. MessageProducer messageProducer; // 消息生产者
  27. // 实例化连接工厂
  28. connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
  29. try {
  30. connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
  31. connection.start(); // 启动连接
  32. session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
  33. destination=session.createTopic("FirstTopic1"); //创建消息发布者
  34. messageProducer=session.createProducer(destination); // 创建消息生产者
  35. sendMessage(session, messageProducer); // 发送消息
  36. session.commit();
  37. } catch (Exception e) {
  38. // TODO Auto-generated catch block
  39. e.printStackTrace();
  40. } finally{
  41. if(connection!=null){
  42. try {
  43. connection.close();
  44. } catch (JMSException e) {
  45. // TODO Auto-generated catch block
  46. e.printStackTrace();
  47. }
  48. }
  49. }
  50. }
  51. /**
  52. * 发送消息
  53. * @param session
  54. * @param messageProducer
  55. * @throws Exception
  56. */
  57. public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
  58. for(int i=0;i<JMSProducer.SENDNUM;i++){
  59. TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
  60. System.out.println("发送消息:"+"ActiveMQ 发布的消息"+i);
  61. messageProducer.send(message);
  62. }
  63. }
  64. }

3)消息监听者

Listener1

  1. package com.fendo.mq.publishsubscribe;
  2. import javax.jms.JMSException;
  3. import javax.jms.Message;
  4. import javax.jms.MessageListener;
  5. import javax.jms.TextMessage;
  6. /**
  7. * 消息监听-订阅者一
  8. * @author fendo
  9. *
  10. */
  11. public class Listener1 implements MessageListener{
  12. @Override
  13. public void onMessage(Message message) {
  14. try {
  15. System.out.println("订阅者一收到的消息:"+((TextMessage)message).getText());
  16. } catch (JMSException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }

Listener2

  1. package com.fendo.mq.publishsubscribe;
  2. import javax.jms.JMSException;
  3. import javax.jms.Message;
  4. import javax.jms.MessageListener;
  5. import javax.jms.TextMessage;
  6. /**
  7. * 消息监听-订阅者二
  8. * @author fendo
  9. *
  10. */
  11. public class Listener2 implements MessageListener{
  12. @Override
  13. public void onMessage(Message message) {
  14. try {
  15. System.out.println("订阅者二收到的消息:"+((TextMessage)message).getText());
  16. } catch (JMSException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }

三、测试

首先运行两个主题订阅者,运行之后,在ActiveMQ控制台中可以看到一些信息

20170722214137124

“Number Of Consumers” 消费者的数量为2,然后运行发布者,再次查看页面会变成如下

20170722214744676

完整示例如下: http://download.csdn.net/detail/u011781521/9907601

发表评论

表情:
评论列表 (有 0 条评论,408人围观)

还没有评论,来说两句吧...

相关阅读