Activemq 点对点模式测试代码

曾经终败给现在 2024-04-17 06:25 130阅读 0赞
  1. import javax.jms.Connection;
  2. import javax.jms.ConnectionFactory;
  3. import javax.jms.JMSException;
  4. import javax.jms.Message;
  5. import javax.jms.MessageConsumer;
  6. import javax.jms.MessageListener;
  7. import javax.jms.MessageProducer;
  8. import javax.jms.Queue;
  9. import javax.jms.TextMessage;
  10. import javax.jms.Session;
  11. import org.apache.activemq.ActiveMQConnectionFactory;
  12. import org.junit.Test;
  13. public class ActivemqTest {
  14. /**
  15. * 点到点模式发送消息
  16. */
  17. @Test
  18. public void testQueueProducer() throws Exception {
  19. // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
  20. //brokerURL服务器的ip及端口号
  21. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.100:61616");
  22. // 第二步:使用ConnectionFactory对象创建一个Connection对象。
  23. javax.jms.Connection connection=connectionFactory.createConnection();
  24. // 第三步:开启连接,调用Connection对象的start方法。
  25. connection.start();
  26. // 第四步:使用Connection对象创建一个Session对象。
  27. //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
  28. //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
  29. Session session= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  30. // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
  31. //参数:队列的名称。
  32. Queue queue= session.createQueue("test-queue");
  33. // 第六步:使用Session对象创建一个Producer对象。
  34. MessageProducer producer = session.createProducer(queue);
  35. // 第七步:创建一个Message对象,创建一个TextMessage对象。
  36. /*TextMessage message = new ActiveMQTextMessage();
  37. message.setText("hello activeMq,this is my first test.");*/
  38. TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
  39. // 第八步:使用Producer对象发送消息。
  40. producer.send(textMessage);
  41. // 第九步:关闭资源。
  42. producer.close();
  43. session.close();
  44. connection.close();
  45. }
  46. @Test
  47. public void testQueueConsumer() throws Exception {
  48. // 第一步:创建一个ConnectionFactory对象。
  49. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.100:61616");
  50. // 第二步:从ConnectionFactory对象中获得一个Connection对象。
  51. //Connection connection = connectionFactory.createConnection();
  52. Connection connection=connectionFactory.createConnection();
  53. // 第三步:开启连接。调用Connection对象的start方法。
  54. connection.start();
  55. // 第四步:使用Connection对象创建一个Session对象。
  56. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  57. // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
  58. Queue queue = session.createQueue("test-queue");
  59. // 第六步:使用Session对象创建一个Consumer对象。
  60. MessageConsumer consumer = session.createConsumer(queue);
  61. // 第七步:接收消息。
  62. consumer.setMessageListener(new MessageListener() {
  63. @Override
  64. public void onMessage(Message message) {
  65. try {
  66. TextMessage textMessage = (TextMessage) message;
  67. String text = null;
  68. //取消息的内容
  69. text = textMessage.getText();
  70. // 第八步:打印消息。
  71. System.out.println(text);
  72. } catch (JMSException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. });
  77. //等待键盘输入
  78. System.in.read();
  79. // 第九步:关闭资源
  80. consumer.close();
  81. session.close();
  82. connection.close();
  83. }
  84. }

发表评论

表情:
评论列表 (有 0 条评论,130人围观)

还没有评论,来说两句吧...

相关阅读

    相关 activeMQ 方式

    在点对点的传输方式中,消息数据被持久化,每条消息都能被消费,没有监听QUEUE地址也能被消费,数据不会丢失,一对一的发布接受策略,保证数据完整。 点对点的模式主要建立在一个队