ActiveMQ(4)-发布订阅 逃离我推掉我的手 2022-06-05 00:10 168阅读 0赞 前面介绍的producer和consumer为点对点的消息,本篇介绍一下activemq的一对多消息。 发布订阅模式类似于日常生活中订阅报纸。每到年尾的时候,邮局就会发报纸集合让选择订阅那一种。在这个表里面列出所有出版社发行的报纸,那么对于我们每一个订阅者来说,我们可以选择一份或者多分报纸。比如人名日报,环球时报等。那么这些我们的报纸就相当于发布订阅模式里面的topic。有很多人订阅报纸,也有人可能和我们订阅相同的报纸。那么在这里相当于我们在同一个topic里面注册了。对于一份报纸发行方来说,他和所有的订阅者就构成了一个一对多的关系。 # 1.发布者 # package com.mq.activemq_01.pb; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Publish { private ConnectionFactory connectionFactory; private Connection connection; private Session session; private MessageProducer messageProducer; public Publish(){ try { this.connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); this.connection = this.connectionFactory.createConnection(); this.connection.start(); this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.messageProducer = this.session.createProducer(null); } catch (Exception e) { // TODO: handle exception } } public void sendMessage(){ try { Destination destination = this.session.createTopic("topic1"); TextMessage textMessage = (TextMessage)this.session.createTextMessage("我是发布订阅消息"); messageProducer.send(destination, textMessage); } catch (Exception e) { // TODO: handle exception } } public static void main(String[] args) { Publish publish = new Publish(); publish.sendMessage(); } } # 2.订阅者 # package com.mq.activemq_01.pb; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer1 { private ConnectionFactory connectionFactory; private Connection connection; private Session session; private MessageConsumer messageConsumer; private Destination destination; public Consumer1(){ try { this.connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); this.connection = this.connectionFactory.createConnection(); this.connection.start(); this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); } catch (Exception e) { // TODO: handle exception } } public void receive(){ try { this.destination = this.session.createTopic("topic1"); this.messageConsumer = this.session.createConsumer(destination); this.messageConsumer.setMessageListener(new Listener()); } catch (Exception e) { // TODO: handle exception } } class Listener implements MessageListener{ public void onMessage(Message message) { try { if(message instanceof MapMessage){ MapMessage mm = (MapMessage) message; System.out.println(mm.toString()); String name = mm.getString("name"); String loginName = mm.getString("loginName"); String password = mm.getString("password"); int age = mm.getInt("age"); System.out.println(name); System.out.println(loginName); System.out.println(password); System.out.println(age); } if(message instanceof TextMessage){ System.out.println(message.toString()); System.out.println(((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } } } public static void main(String[] args) { Consumer1 consumer = new Consumer1(); consumer.receive(); } } package com.mq.activemq_01.pb; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer2 { private ConnectionFactory connectionFactory; private Connection connection; private Session session; private MessageConsumer messageConsumer; private Destination destination; public Consumer2(){ try { this.connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); this.connection = this.connectionFactory.createConnection(); this.connection.start(); this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); } catch (Exception e) { // TODO: handle exception } } public void receive(){ try { this.destination = this.session.createTopic("topic1"); this.messageConsumer = this.session.createConsumer(destination); this.messageConsumer.setMessageListener(new Listener()); } catch (Exception e) { // TODO: handle exception } } class Listener implements MessageListener{ public void onMessage(Message message) { try { if(message instanceof MapMessage){ MapMessage mm = (MapMessage) message; System.out.println(mm.toString()); String name = mm.getString("name"); String loginName = mm.getString("loginName"); String password = mm.getString("password"); int age = mm.getInt("age"); System.out.println(name); System.out.println(loginName); System.out.println(password); System.out.println(age); } if(message instanceof TextMessage){ System.out.println(message.toString()); System.out.println(((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } } } public static void main(String[] args) { Consumer2 consumer = new Consumer2(); consumer.receive(); } } package com.mq.activemq_01.pb; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer3 { private ConnectionFactory connectionFactory; private Connection connection; private Session session; private MessageConsumer messageConsumer; private Destination destination; public Consumer3(){ try { this.connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); this.connection = this.connectionFactory.createConnection(); this.connection.start(); this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); } catch (Exception e) { // TODO: handle exception } } public void receive(){ try { this.destination = this.session.createTopic("topic1"); this.messageConsumer = this.session.createConsumer(destination); this.messageConsumer.setMessageListener(new Listener()); } catch (Exception e) { // TODO: handle exception } } class Listener implements MessageListener{ public void onMessage(Message message) { try { if(message instanceof MapMessage){ MapMessage mm = (MapMessage) message; System.out.println(mm.toString()); String name = mm.getString("name"); String loginName = mm.getString("loginName"); String password = mm.getString("password"); int age = mm.getInt("age"); System.out.println(name); System.out.println(loginName); System.out.println(password); System.out.println(age); } if(message instanceof TextMessage){ System.out.println(message.toString()); System.out.println(((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } } } public static void main(String[] args) { Consumer3 consumer = new Consumer3(); consumer.receive(); } }
还没有评论,来说两句吧...