activemq使用JMS发送消息和接收消息

水深无声 2022-08-10 04:50 422阅读 0赞

1.发送消息

  1. package com.activemq;
  2. import javax.jms.Connection;
  3. import javax.jms.DeliveryMode;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageProducer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnection;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. /**
  12. * 发送消息
  13. * @author suyunlong
  14. *
  15. */
  16. public class ProducerTool
  17. {
  18. private String user=ActiveMQConnection.DEFAULT_USER;
  19. private String password=ActiveMQConnection.DEFAULT_PASSWORD;
  20. private String url=ActiveMQConnection.DEFAULT_BROKER_URL;
  21. private String subject="TOOL.DEFAULT";
  22. private Destination destination=null;
  23. private Connection connection=null;
  24. private Session session=null;
  25. private MessageProducer producer=null;
  26. // 初始化
  27. private void initialize() throws JMSException, Exception
  28. {
  29. ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
  30. user, password, url);
  31. connection=connectionFactory.createConnection();
  32. session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
  33. destination=session.createQueue(subject);
  34. producer=session.createProducer(destination);
  35. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  36. }
  37. // 发送消息
  38. public void produceMessage(String message) throws JMSException, Exception
  39. {
  40. initialize();
  41. TextMessage msg=session.createTextMessage(message);
  42. connection.start();
  43. System.out.println("Producer:->Sending message: "+message);
  44. producer.send(msg);
  45. System.out.println("Producer:->Message sent complete!");
  46. }
  47. // 关闭连接
  48. public void close() throws JMSException
  49. {
  50. System.out.println("Producer:->Closing connection");
  51. if(producer != null)
  52. {
  53. producer.close();
  54. }
  55. if(session != null)
  56. {
  57. session.close();
  58. }
  59. if(connection != null)
  60. {
  61. connection.close();
  62. }
  63. }
  64. }

2.接收消息

  1. package com.activemq;
  2. import javax.jms.Connection;
  3. import javax.jms.Destination;
  4. import javax.jms.JMSException;
  5. import javax.jms.MessageConsumer;
  6. import javax.jms.Session;
  7. import javax.jms.MessageListener;
  8. import javax.jms.Message;
  9. import javax.jms.TextMessage;
  10. import org.apache.activemq.ActiveMQConnection;
  11. import org.apache.activemq.ActiveMQConnectionFactory;
  12. /**
  13. * 接受消息,我用的是基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法,
  14. * 当接受到消息的时候会自动调用这个函数对消息进行处理。
  15. * 如果想主动的去接受消息,而不用消息监听的话,把consumer.setMessageListener(this)改为
  16. * Message message=consumer.receive(),手动去调用MessageConsumer的receive方法即可。
  17. * @author suyunlong
  18. *
  19. */
  20. public class ConsumerTool implements MessageListener
  21. {
  22. private String user=ActiveMQConnection.DEFAULT_USER;
  23. private String password=ActiveMQConnection.DEFAULT_PASSWORD;
  24. private String url=ActiveMQConnection.DEFAULT_BROKER_URL;
  25. private String subject="TOOL.DEFAULT";
  26. private Destination destination=null;
  27. private Connection connection=null;
  28. private Session session=null;
  29. private MessageConsumer consumer=null;
  30. // 初始化
  31. private void initialize() throws JMSException, Exception
  32. {
  33. //连接工厂是用户创建连接的对象,这里使用的是ActiveMQ的ActiveMQConnectionFactory根据url,username和password创建连接工厂。
  34. ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
  35. user, password, url);
  36. //连接工厂创建一个jms connection
  37. connection=connectionFactory.createConnection();
  38. //是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。
  39. session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //不支持事务
  40. //目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象,两种消息传递方式:点对点和发布/订阅
  41. destination=session.createQueue(subject);
  42. //会话创建消息的生产者将消息发送到目的地
  43. consumer=session.createConsumer(destination);
  44. }
  45. // 消费消息
  46. public void consumeMessage() throws JMSException, Exception
  47. {
  48. initialize();
  49. connection.start();
  50. System.out.println("Consumer:->Begin listening...");
  51. // 开始监听
  52. consumer.setMessageListener(this);
  53. // Message message = consumer.receive();
  54. }
  55. // 关闭连接
  56. public void close() throws JMSException
  57. {
  58. System.out.println("Consumer:->Closing connection");
  59. if(consumer != null)
  60. {
  61. consumer.close();
  62. }
  63. if(session != null)
  64. {
  65. session.close();
  66. }
  67. if(connection != null)
  68. {
  69. connection.close();
  70. }
  71. }
  72. // 消息处理函数
  73. public void onMessage(Message message)
  74. {
  75. try
  76. {
  77. if(message instanceof TextMessage)
  78. {
  79. TextMessage txtMsg=(TextMessage)message;
  80. String msg=txtMsg.getText();
  81. System.out.println("Consumer:->Received: " + msg);
  82. }
  83. else
  84. {
  85. System.out.println("Consumer:->Received: " + message);
  86. }
  87. }
  88. catch(JMSException e)
  89. {
  90. System.out.println(e.getMessage());
  91. e.printStackTrace();
  92. }
  93. }
  94. }

3.测试

  1. package com.activemq;
  2. import org.apache.activemq.ActiveMQConnection;
  3. public class Test
  4. {
  5. public static void main(String[] args)
  6. {
  7. ConsumerTool consumer=new ConsumerTool();
  8. ProducerTool producer=new ProducerTool();
  9. System.out.println(ActiveMQConnection.DEFAULT_BROKER_URL+"------------");
  10. try
  11. {
  12. // 开始监听
  13. producer.produceMessage("Hello, world!");
  14. producer.close();
  15. consumer.consumeMessage();
  16. // 延时500毫秒之后停止接受消息
  17. Thread.sleep(5000);
  18. consumer.close();
  19. }
  20. catch(Exception e)
  21. {
  22. System.out.println(e.getMessage());
  23. e.printStackTrace();
  24. }
  25. }
  26. }

发表评论

表情:
评论列表 (有 0 条评论,422人围观)

还没有评论,来说两句吧...

相关阅读