ActiveMQ实战(四)--ActiveMQ的通信方式之request-response(请求响应模式)

朱雀 2022-06-12 12:09 242阅读 0赞

一、简介

在前面的两种模式中都是一方负责发送消息而另外一方负责处理。而我们实际中的很多应用相当于一种一应一答的过程,需要双方都能给对方发送消息。于是请求-应答的这种通信方式也很重要。它也应用的很普遍。

请求-应答方式并不是JMS规范系统默认提供的一种通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。

下图是典型的请求-应答方式的交互过程:

20170722231617454

在JMS里面,如果要实现请求/应答的方式,可以利用JMSReplyTo和JMSCorrelationID消息头来将通信的双方关联起来。另外,QueueRequestor和TopicRequestor能够支持简单的请求/应答过程。

現在,如果我们要实现这么一个过程,在发送请求消息并且等待返回结果的client端的流程如下:

  1. // client side
  2. Destination tempDest = session.createTemporaryQueue();
  3. MessageConsumer responseConsumer = session.createConsumer(tempDest);
  4. ...
  5. // send a request..
  6. message.setJMSReplyTo(tempDest)
  7. message.setJMSCorrelationID(myCorrelationID);
  8. producer.send(message);

client端创建一个临时队列并在发送的消息里指定了发送返回消息的destination以及correlationID。那么在处理消息的server端得到这个消息后就知道该发送给谁了。

Server端的大致流程如下:

  1. public void onMessage(Message request) {
  2. Message response = session.createMessage();
  3. response.setJMSCorrelationID(request.getJMSCorrelationID())
  4. producer.send(request.getJMSReplyTo(), response)
  5. }

这里我们是用server端注册MessageListener,通过设置返回信息的CorrelationID和JMSReplyTo将信息返回。

以上就是发送和接收消息的双方的大致程序结构。

二、实战request-response(请求响应模式)通信

具体的实现代码如下:

Client:

  1. package com.fendo.mq;
  2. import org.apache.activemq.ActiveMQConnectionFactory;
  3. import javax.jms.*;
  4. import java.util.Random;
  5. /**
  6. * 客户端
  7. * @author fendo
  8. *
  9. */
  10. public class Client implements MessageListener {
  11. private static int ackMode;
  12. private static String clientQueueName;
  13. private boolean transacted = false;
  14. private MessageProducer producer;
  15. static {
  16. clientQueueName = "client.messages";
  17. ackMode = Session.AUTO_ACKNOWLEDGE;
  18. }
  19. public Client() {
  20. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
  21. Connection connection;
  22. try {
  23. connection = connectionFactory.createConnection();
  24. connection.start();
  25. Session session = connection.createSession(transacted, ackMode);
  26. Destination adminQueue = session.createQueue(clientQueueName);
  27. //设置消息生成器将消息发送到服务器正在消耗的队列
  28. this.producer = session.createProducer(adminQueue);
  29. this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  30. //创建一个临时队列,该客户端将侦听响应,
  31. //然后创建一个从该临时队列消耗消息的消费者...对于真正的应用程序,
  32. //客户端应该为服务器的每个消息重新使用相同的临时队列...一个临时队列 每个客户端
  33. Destination tempDest = session.createTemporaryQueue();
  34. MessageConsumer responseConsumer = session.createConsumer(tempDest);
  35. //此类也将处理到临时队列的消息
  36. responseConsumer.setMessageListener(this);
  37. //现在创建您要发送的实际消息
  38. TextMessage txtMessage = session.createTextMessage();
  39. // 设置信息
  40. txtMessage.setText("MyProtocolMessage");
  41. //将回复字段设置为上面创建的临时队列,这是服务器应答的队列...
  42. txtMessage.setJMSReplyTo(tempDest);
  43. //设置相关ID,以便当您收到响应时,您知道响应是哪个发送消息
  44. //如果没有多个未完成的消息给服务器,那么
  45. //相同的相关ID可以用于所有的消息...如果有多个未完成
  46. //消息到您可能想要将相关ID与此关联的服务器
  47. //消息不知何故...一个地图很好
  48. String correlationId = this.createRandomString();
  49. txtMessage.setJMSCorrelationID(correlationId);
  50. this.producer.send(txtMessage);
  51. } catch (JMSException e) {
  52. //妥善处理异常
  53. }
  54. }
  55. private String createRandomString() {
  56. Random random = new Random(System.currentTimeMillis());
  57. long randomLong = random.nextLong();
  58. return Long.toHexString(randomLong);
  59. }
  60. public void onMessage(Message message) {
  61. String messageText = null;
  62. try {
  63. if (message instanceof TextMessage) {
  64. TextMessage textMessage = (TextMessage) message;
  65. messageText = textMessage.getText();
  66. System.out.println("响应内容 = " + messageText);
  67. }
  68. } catch (JMSException e) {
  69. //妥善处理异常
  70. }
  71. }
  72. public static void main(String[] args) {
  73. new Client();
  74. }
  75. }

这里的代码除了初始化构造函数里的参数还同时设置了两个destination,一个是自己要发送消息出去的destination,在session.createProducer(adminQueue);这一句设置。

另外一个是自己要接收的消息destination, 通过Destination tempDest = session.createTemporaryQueue(); responseConsumer = session.createConsumer(tempDest); 这两句指定了要接收消息的目的地。这里是用的一个临时队列。在前面指定了返回消息的通信队列之后,我们需要通知server端知道发送返回消息给哪个队列。于是txtMessage.setJMSReplyTo(tempDest);指定了这一部分,同时txtMessage.setJMSCorrelationID(correlationId);方法主要是为了保证每次发送回来请求的server端能够知道对应的是哪个请求。这里一个请求和一个应答是相当于对应一个相同的序列号一样。

  1. 同时,因为client端在发送消息之后还要接收server端返回的消息,所以它也要实现一个消息receiver的功能。这里采用实现MessageListener接口的方式:
  2. public void onMessage(Message message) {
  3. String messageText = null;
  4. try {
  5. if (message instanceof TextMessage) {
  6. TextMessage textMessage = (TextMessage) message;
  7. messageText = textMessage.getText();
  8. System.out.println("messageText = " + messageText);
  9. }
  10. } catch (JMSException e) {
  11. //Handle the exception appropriately
  12. }
  13. }

Server:

这里server端要执行的过程和client端相反,它是先接收消息,在接收到消息后根据提供的JMSCorelationID来发送返回的消息:

  1. public void onMessage(Message message) {
  2. try {
  3. TextMessage response = this.session.createTextMessage();
  4. if (message instanceof TextMessage) {
  5. TextMessage txtMsg = (TextMessage) message;
  6. String messageText = txtMsg.getText();
  7. response.setText(this.messageProtocol.handleProtocolMessage(messageText));
  8. }
  9. //从接收到的消息中设置相关ID为响应消息的相关ID
  10. //这可以让客户端识别该消息的响应
  11. //向服务器发送的一个未完成的消息
  12. response.setJMSCorrelationID(message.getJMSCorrelationID());
  13. //将响应发送到接收消息的JMSReplyTo字段指定的目的地,
  14. //这大概是客户创建的临时队列
  15. this.replyProducer.send(message.getJMSReplyTo(), response);
  16. } catch (JMSException e) {
  17. //妥善处理异常
  18. }
  19. }

前面,在replyProducer.send()方法里,message.getJMSReplyTo()就得到了要发送消息回去的destination。
另外,设置这些发送返回信息的replyProducer的信息主要在构造函数相关的方法里实现了:

  1. public Server() {
  2. try {
  3. //这个消息代理是嵌入的
  4. BrokerService broker = new BrokerService();
  5. broker.setPersistent(false);
  6. broker.setUseJmx(false);
  7. broker.addConnector(messageBrokerUrl);
  8. broker.start();
  9. } catch (Exception e) {
  10. //妥善处理异常
  11. }
  12. //将消息的处理委托给另一个类,在设置JMS之前实例化它,这样它就可以处理消息了
  13. this.messageProtocol = new MessageProtocol();
  14. this.setupMessageQueueConsumer();
  15. }
  16. private void setupMessageQueueConsumer() {
  17. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
  18. Connection connection;
  19. try {
  20. connection = connectionFactory.createConnection();
  21. connection.start();
  22. this.session = connection.createSession(this.transacted, ackMode);
  23. Destination adminQueue = this.session.createQueue(messageQueueName);
  24. //设置一个消息生成器响应来自客户端的消息,我们将从一个消息发送到从jmsreplytoheader字段发送到的目的地
  25. this.replyProducer = this.session.createProducer(null);
  26. this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  27. //设置消费者从管理队列中消费消息
  28. MessageConsumer consumer = this.session.createConsumer(adminQueue);
  29. consumer.setMessageListener(this);
  30. } catch (JMSException e) {
  31. //妥善处理异常
  32. }
  33. }

总体来说,整个的交互过程并不复杂,只是比较繁琐。对于请求/应答的方式来说,这种典型交互的过程就是Client端在设定正常发送请求的Queue同时也设定一个临时的Queue。同时在要发送的message里头指定要返回消息的destination以及CorelationID,这些就好比是一封信里面所带的回执。根据这个信息人家才知道怎么给你回信。对于Server端来说则要额外创建一个producer,在处理接收到消息的方法里再利用producer将消息发回去。这一系列的过程看起来很像http协议里面请求-应答的方式,都是一问一答。

MessageProtocol

  1. package com.fendo.mq;
  2. /**
  3. * 此类需要运行上面的客户端/服务器示例。 将消息处理委托给单独的类仅仅是个人喜好。
  4. * @author fendo
  5. *
  6. */
  7. public class MessageProtocol {
  8. public String handleProtocolMessage(String messageText) {
  9. String responseText;
  10. // 判断是否是client传过来的信息,在这里就可以做些解析
  11. if ("MyProtocolMessage".equalsIgnoreCase(messageText)) {
  12. responseText = "我收到了信息";
  13. } else {
  14. responseText = "我不知道你传的是什么: " + messageText;
  15. }
  16. return responseText;
  17. }
  18. }

三、运行测试

先运行客户端(请求),在ActiveMQ控制台进行查看,会多出一个记录

20170723123642137

“Number Of Pending Messages “等待消费的数量为”1”,进入队列的消息为”1”,然后运行服务端(响应),会收到信息

20170723123937803

再查看控制面板信息会变成如下所示

20170723124207163

“Number Of Consumers”消费者为”1”,”Messages Enqueued”入队数量为”1”,”Messages Dequeued”出队数量为”1”。

完整示例: http://download.csdn.net/detail/u011781521/9907835

参考:http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html

发表评论

表情:
评论列表 (有 0 条评论,242人围观)

还没有评论,来说两句吧...

相关阅读

    相关 activemq几种基本通信方式总结

    简介      在前面一篇文章里讨论过几种应用系统集成的方式,发现实际上面向消息队列的集成方案算是一个总体比较合理的选择。这里,我们先针对具体的一个消息队列Activem