WebSphereMq使用JMS发送消息和接收消息 痛定思痛。 2022-08-10 04:50 172阅读 0赞 1.WebSphereMq发送消息 package com.wsmq; import java.util.Scanner; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import com.ibm.mq.jms.MQQueueConnectionFactory; /** * WebSphereMq发送消息 * @author suyunlong * */ public class MQSender implements MessageListener { MQQueueConnectionFactory mcf; QueueConnection qconn; final String HOSTNAME="127.0.0.1"; final int PORT=1414; final String QUEUEMANAGER_NAME="mq"; final String QUEUE_NAME="q1"; final String QUEUE_NAME2="q2"; boolean replyed=false; private void openConnection() throws JMSException { mcf=new MQQueueConnectionFactory(); mcf.setHostName(HOSTNAME); mcf.setPort(PORT); mcf.setQueueManager(QUEUEMANAGER_NAME); qconn=mcf.createQueueConnection(); qconn.start(); } private void sendMessage(String msgInfo) throws JMSException, InterruptedException { openConnection(); QueueSession session=qconn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); Queue queue=session.createQueue(QUEUE_NAME); Queue responseQueue=session.createQueue(QUEUE_NAME2); QueueSender sender=session.createSender(queue); TextMessage msg=session.createTextMessage(); // msg.setJMSCorrelationID("123-123456"); // msg.setIntProperty("AccountID", 123); msg.setJMSReplyTo(responseQueue); //设置回复队列 msg.setText(msgInfo); sender.send(msg); System.out.println("消息发送:JMSMessage"+msg.getJMSMessageID()); //接收回复信息 System.out.println("等待客户端回复队列:"+msg.getJMSReplyTo()); String filter="JMSCorrelationID='"+msg.getJMSMessageID()+"'"; QueueReceiver reply=session.createReceiver(responseQueue,filter); //同步方式等待接收回复 // TextMessage resMsg = (TextMessage) reply.receive(60 * 1000); // if(resMsg != null){ // System.out.println("客户端回复消息 : " + resMsg.getText() + " JMSCorrelation" + resMsg.getJMSCorrelationID()); // }else{ // System.out.println("等待超时!"); // } //异步方式接收回复 reply.setMessageListener(this); while(!replyed) { Thread.sleep(1000); } qconn.stop(); sender.close(); session.close(); disConnection(); } public void onMessage(Message message) { try { String textMessage=((TextMessage)message).getText(); System.out.println("客户端回复消息 :"+textMessage+" JMSCorrelation"+message.getJMSCorrelationID()); } catch(JMSException e) { System.out.println(e.getMessage()); e.printStackTrace(); } finally { replyed=true; } } private void disConnection() throws JMSException { qconn.close(); } @SuppressWarnings("resource") public static void main(String[] args) throws JMSException, InterruptedException { MQSender ms=new MQSender(); Scanner scan=new Scanner(System.in); System.out.print("输入信息:"); ms.sendMessage(scan.next()); System.out.println("消息发送完毕!"); } } 2.WebSphereMq接收消息 package com.wsmq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import com.ibm.mq.jms.MQQueueConnectionFactory; /** * WebSphereMq接收消息 * @author suyunlong * */ public class MQRecevicer implements MessageListener { MQQueueConnectionFactory mcf; QueueConnection qconn; QueueSession session; final String HOSTNAME="127.0.0.1"; final int PORT=1414; final String QUEUEMANAGER_NAME="mq"; final String QUEUE_NAME="q1"; final String QUEUE_NAME2="q2"; boolean replyed=false; public void openConnection() throws JMSException { mcf=new MQQueueConnectionFactory(); mcf.setHostName(HOSTNAME); mcf.setPort(PORT); mcf.setQueueManager(QUEUEMANAGER_NAME); qconn=mcf.createQueueConnection(); qconn.start(); } public void disConnection() throws JMSException { qconn.close(); } public void recevicerMessage(String reply) throws JMSException, InterruptedException { openConnection(); session=qconn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue=session.createQueue(QUEUE_NAME); QueueReceiver recevier=session.createReceiver(queue); //同步方式接收消息并回复 // TextMessage textMessage = (TextMessage) recevier.receive(); // System.out.println("接收消息 : " + textMessage.getText() + " JMSMessage" +textMessage.getJMSMessageID()); // Queue responseQueue = (Queue) textMessage.getJMSReplyTo(); // if(responseQueue != null){ // TextMessage responseMsg = session.createTextMessage(); // responseMsg.setJMSCorrelationID(textMessage.getJMSMessageID()); // responseMsg.setText("This message is reply from client..."+textMessage.getText()); // session.createSender(responseQueue).send(responseMsg); // System.out.println("客户端回复队列:"+responseQueue.toString()+" JMSCorrelation"+responseMsg.getJMSCorrelationID()); // }else{ // System.out.println("服务端回复队列为空"); // } //异步方式接收消息并回复 recevier.setMessageListener(this); while(!replyed) { Thread.sleep(1000); } qconn.stop(); recevier.close(); session.close(); disConnection(); } public void onMessage(Message message) { try { String textMessage=((TextMessage)message).getText(); System.out.println("接收消息 : "+textMessage+" JMSMessage"+message.getJMSMessageID()); Queue responseQueue=(Queue)message.getJMSReplyTo(); if(responseQueue != null) { TextMessage responseMsg=session.createTextMessage(); responseMsg.setJMSCorrelationID(message.getJMSMessageID()); responseMsg.setText("This message is reply from client:"+textMessage); session.createSender(responseQueue).send(responseMsg); System.out.println("客户端回复队列:"+responseQueue+" JMSCorrelation"+responseMsg.getJMSCorrelationID()); } else { System.out.println("服务端回复队列为空"); } } catch(JMSException e) { System.out.println(e.getMessage()); e.printStackTrace(); } finally { replyed=true; } } public static void main(String[] args) throws JMSException, InterruptedException { MQRecevicer mr=new MQRecevicer(); System.out.println("正在接收消息..."); mr.recevicerMessage("消息已经收到,这是接收端的回复!"); System.out.println("消息接收完毕!"); } }
还没有评论,来说两句吧...