JMS调用IBM MQ应用--点对点模式
第一篇主要讨论了IBM MQ的安装以及调试样例遇到的几个问题。
这一篇文章主要针对点对点模式来学习一下。学习的样例来源为IBM MQ的sample中的例子。
点对点模式下有一个消息生产者,有消息消费者。一条消息只能消费一次。
JmsProducer 是消息的生产者。
Java代码
- package test;
- // SCCSID “@(#) MQMBID sn=p000-L120604 su=_H-IvIK4nEeGko6IWl3MDhA pn=MQJavaSamples/jms/JmsProducer.java”
- /*
- * <copyright
- * notice=”lm-source-program”
- * pids=”5724-H72,5655-R36,5655-L82,5724-L26,”
- * years=”2008,2012”
- * crc=”279216363” >
- * Licensed Materials - Property of IBM
- *
- * 5724-H72,5655-R36,5655-L82,5724-L26,
- *
- * (C) Copyright IBM Corp. 2008, 2012 All Rights Reserved.
- *
- * US Government Users Restricted Rights - Use, duplication or
- * disclosure restricted by GSA ADP Schedule Contract with
- * IBM Corp.
- *
- */
- import javax.jms.Connection;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import com.ibm.msg.client.jms.JmsConnectionFactory;
- import com.ibm.msg.client.jms.JmsFactoryFactory;
- import com.ibm.msg.client.wmq.WMQConstants;
- /**
- * A JMS producer (sender or publisher) application that sends a simple message to the named
- * destination (queue or topic).
- *
- * Notes:
- *
- * API type: IBM JMS API (v1.1, unified domain)
- *
- * Messaging domain: Point-to-point or Publish-Subscribe
- *
- * Provider type: WebSphere MQ
- *
- * Connection mode: Client connection
- *
- * JNDI in use: No
- *
- * Usage:
- *
- * JmsProducer -m queueManagerName -d destinationName [-h host -p port -l channel]
- *
- * for example:
- *
- * JmsProducer -m QM1 -d Q1
- *
- * JmsProducer -m QM1 -d topic://foo -h localhost -p 1414
- */
- public class JmsProducer {
- private static String host = “localhost”;
- private static int port = 1414;
- private static String channel = “SYSTEM.DEF.SVRCONN”;
- private static String queueManagerName = null;
- private static String destinationName = null;
- //这里用来判断是不是点对点模式
- private static boolean isTopic = false;
- // System exit status value (assume unset value to be 1)
- private static int status = 1;
- /**
- * Main method
- *
- * @param args
- */
- public static void main(String[] args) {
- // Parse the arguments
- //队列管理器名称如果出现下划线的话会提示( ‘MQCC_FAILED’ ),原因为 ‘2058’ ( ‘MQRC_Q_MGR_NAME_ERROR’ )。
- args = new String[]{ “-m”,”QMTest”, “-d”,”testQueue”};
- // args = new String[]{“-m”,”aaaa”, “-d”,”topic://zhuti”,”-h”,”localhost”,”-p”,”14114”};
- parseArgs(args);
- // Variables
- Connection connection = null;
- Session session = null;
- Destination destination = null;
- MessageProducer producer = null;
- try {
- // Create a connection factory
- //JmsFactoryFactory用来根据指定类型来创建connection factory和destination objects
- JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
- //根据工厂工厂创建连接工厂类的实例
- JmsConnectionFactory cf = ff.createConnectionFactory();
- // Set the properties
- //封装连接信息
- //使用JmsPropertyContext接口中的方法封装信息
- cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, host);
- cf.setIntProperty(WMQConstants.WMQ_PORT, port);
- //SYSTEM.DEF.SVRCONN是通道的连接类型
- cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel);
- //WMQ_CM_CLIENT的含义,什么时候用,目前还不清楚
- cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
- cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManagerName);
- // Create JMS objects
- connection = cf.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- if (isTopic) {
- destination = session.createTopic(destinationName);
- }
- else {
- destination = session.createQueue(destinationName);
- }
- producer = session.createProducer(destination);
- long uniqueNumber = System.currentTimeMillis() % 1000;
- TextMessage message = session.createTextMessage(“JmsProducer: Your lucky number today is “
- uniqueNumber);
- // Start the connection
- connection.start();
- // And, send the message
- producer.send(message);
- System.out.println(“Sent message:\n” + message);
- recordSuccess();
- }
- catch (JMSException jmsex) {
- recordFailure(jmsex);
- }
- finally {
- if (producer != null) {
- try {
- producer.close();
- }
- catch (JMSException jmsex) {
- System.out.println(“Producer could not be closed.”);
- recordFailure(jmsex);
- }
- }
- if (session != null) {
- try {
- session.close();
- }
- catch (JMSException jmsex) {
- System.out.println(“Session could not be closed.”);
- recordFailure(jmsex);
- }
- }
- if (connection != null) {
- try {
- connection.close();
- }
- catch (JMSException jmsex) {
- System.out.println(“Connection could not be closed.”);
- recordFailure(jmsex);
- }
- }
- }
- System.exit(status);
- return;
- } // end main()
- /**
- * Process a JMSException and any associated inner exceptions.
- *
- * @param jmsex
- */
- private static void processJMSException(JMSException jmsex) {
- System.out.println(jmsex);
- Throwable innerException = jmsex.getLinkedException();
- if (innerException != null) {
- System.out.println(“Inner exception(s):”);
- }
- while (innerException != null) {
- System.out.println(innerException);
- innerException = innerException.getCause();
- }
- return;
- }
- /**
- * Record this run as successful.
- */
- private static void recordSuccess() {
- System.out.println(“SUCCESS”);
- status = 0;
- return;
- }
- /**
- * Record this run as failure.
- *
- * @param ex
- */
- private static void recordFailure(Exception ex) {
- if (ex != null) {
- if (ex instanceof JMSException) {
- processJMSException((JMSException) ex);
- }
- else {
- System.out.println(ex);
- }
- }
- System.out.println(“FAILURE”);
- status = -1;
- return;
- }
- /**
- * Parse user supplied arguments.
- *
- * @param args
- */
- private static void parseArgs(String[] args) {
- try {
- int length = args.length;
- if (length == 0) {
- throw new IllegalArgumentException(“No arguments! Mandatory arguments must be specified.”);
- }
- if ((length % 2) != 0) {
- throw new IllegalArgumentException(“Incorrect number of arguments!”);
- }
- int i = 0;
- while (i < length) {
- if ((args[i]).charAt(0) != ‘-‘) {
- throw new IllegalArgumentException(“Expected a ‘-‘ character next: “ + args[i]);
- }
- char opt = (args[i]).toLowerCase().charAt(1);
- switch (opt) {
- case ‘h’ :
- host = args[++i];
- break;
- case ‘p’ :
- port = Integer.parseInt(args[++i]);
- break;
- case ‘l’ :
- channel = args[++i];
- break;
- case ‘m’ :
- queueManagerName = args[++i];
- break;
- case ‘d’ :
- destinationName = args[++i];
- break;
- default : {
- throw new IllegalArgumentException(“Unknown argument: “ + opt);
- }
- }
- ++i;
- }
- if (queueManagerName == null) {
- throw new IllegalArgumentException(“A queueManager name must be specified.”);
- }
- if (destinationName == null) {
- throw new IllegalArgumentException(“A destination name must be specified.”);
- }
- // Whether the destination is a queue or a topic. Apply a simple check.
- if (destinationName.startsWith(“topic://“)) {
- isTopic = true;
- }
- else {
- // Otherwise, let’s assume it is a queue.
- isTopic = false;
- }
- }
- catch (Exception e) {
- System.out.println(e.getMessage());
- printUsage();
- System.exit(-1);
- }
- return;
- }
- /**
- * Display usage help.
- */
- private static void printUsage() {
- System.out.println(“\nUsage:”);
- System.out
- .println(“JmsProducer -m queueManagerName -d destinationName [-h host -p port -l channel]“);
- return;
- }
- } // end class
注意:如果还抛( ‘MQCC_FAILED’ ),原因为 ‘2035’ ( ‘MQRC_NOT_AUTHORIZED’ )这个异常,请看上一篇文章。
下面JmsConsumer。JmsConsumer的连接方式和JmsProducer一样,不再赘述。
JmsProducer和JmsConsumer都需要指出主机名,端口,通道,队列名称和队列。
Java代码
- package test;
- // SCCSID “@(#) MQMBID sn=p000-L120604 su=_H-IvIK4nEeGko6IWl3MDhA pn=MQJavaSamples/jms/JmsConsumer.java”
- /*
- * <copyright
- * notice=”lm-source-program”
- * pids=”5724-H72,5655-R36,5655-L82,5724-L26,”
- * years=”2008,2012”
- * crc=”39457954” >
- * Licensed Materials - Property of IBM
- *
- * 5724-H72,5655-R36,5655-L82,5724-L26,
- *
- * (C) Copyright IBM Corp. 2008, 2012 All Rights Reserved.
- *
- * US Government Users Restricted Rights - Use, duplication or
- * disclosure restricted by GSA ADP Schedule Contract with
- * IBM Corp.
- *
- */
- import javax.jms.Connection;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.Session;
- import com.ibm.msg.client.jms.JmsConnectionFactory;
- import com.ibm.msg.client.jms.JmsFactoryFactory;
- import com.ibm.msg.client.wmq.WMQConstants;
- /**
- * A JMS consumer (receiver or subscriber) application that receives a message from the named
- * destination (queue or topic).
- *
- * Tip: A subscriber application must be started before the publisher application.
- *
- * Notes:
- *
- * API type: IBM JMS API (v1.1, unified domain)
- *
- * Messaging domain: Point-to-point or Publish-Subscribe
- *
- * Provider type: WebSphere MQ
- *
- * Connection mode: Client connection
- *
- * JNDI in use: No
- *
- * Usage:
- *
- * JmsConsumer -m queueManagerName -d destinationName [-h host -p port -l channel]
- *
- * for example:
- *
- * JmsConsumer -m QM1 -d Q1
- *
- * JmsConsumer -m QM1 -d topic://foo -h localhost -p 1414
- */
- public class JmsConsumer {
- private static String host = “localhost”;
- private static int port = 1414;
- private static String channel = “SYSTEM.DEF.SVRCONN”;
- private static String queueManagerName = null;
- private static String destinationName = null;
- private static boolean isTopic = false;
- private static int timeout = 15000; // in ms or 15 seconds
- // System exit status value (assume unset value to be 1)
- private static int status = 1;
- /**
- * Main method
- *
- * @param args
- */
- public static void main(String[] args) {
- // args = new String[]{“-m”,”aaaa”, “-d”,”aa”};
- // args = new String[]{“-m”,”aaaa”, “-d”,”topic://zhuti”,”-h”,”localhost”,”-p”,”1414”};
- // Parse the arguments
- args = new String[]{ “-m”,”QMTest”, “-d”,”testQueue”};
- parseArgs(args);
- // Variables
- Connection connection = null;
- Session session = null;
- Destination destination = null;
- MessageConsumer consumer = null;
- try {
- // Create a connection factory
- JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
- JmsConnectionFactory cf = ff.createConnectionFactory();
- // Set the properties
- cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, host);
- cf.setIntProperty(WMQConstants.WMQ_PORT, port);
- cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel);
- cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
- cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManagerName);
- // Create JMS objects
- connection = cf.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- if (isTopic) {
- destination = session.createTopic(destinationName);
- }
- else {
- destination = session.createQueue(destinationName);
- }
- consumer = session.createConsumer(destination);
- // Start the connection
- connection.start();
- // And, receive the message
- //在指定的超市时间内接收下一条消息
- Message message = consumer.receive(timeout);
- if (message != null) {
- // System.err.println(“Received message:\n” + message);
- System.out.println(“Received message:\n” + message);
- }
- else {
- System.out.println(“No message received!\n”);
- recordFailure(null);
- }
- recordSuccess();
- }
- catch (JMSException jmsex) {
- recordFailure(jmsex);
- }
- finally {
- if (consumer != null) {
- try {
- consumer.close();
- }
- catch (JMSException jmsex) {
- System.out.println(“Consumer could not be closed.”);
- recordFailure(jmsex);
- }
- }
- if (session != null) {
- try {
- session.close();
- }
- catch (JMSException jmsex) {
- System.out.println(“Session could not be closed.”);
- recordFailure(jmsex);
- }
- }
- if (connection != null) {
- try {
- connection.close();
- }
- catch (JMSException jmsex) {
- System.out.println(“Connection could not be closed.”);
- recordFailure(jmsex);
- }
- }
- }
- System.exit(status);
- return;
- } // end main()
- /**
- * Process a JMSException and any associated inner exceptions.
- *
- * @param jmsex
- */
- private static void processJMSException(JMSException jmsex) {
- System.out.println(jmsex);
- Throwable innerException = jmsex.getLinkedException();
- if (innerException != null) {
- System.out.println(“Inner exception(s):”);
- }
- while (innerException != null) {
- System.out.println(innerException);
- innerException = innerException.getCause();
- }
- return;
- }
- /**
- * Record this run as successful.
- */
- private static void recordSuccess() {
- System.out.println(“SUCCESS”);
- status = 0;
- return;
- }
- /**
- * Record this run as failure.
- *
- * @param ex
- */
- private static void recordFailure(Exception ex) {
- if (ex != null) {
- if (ex instanceof JMSException) {
- processJMSException((JMSException) ex);
- }
- else {
- System.out.println(ex);
- }
- }
- System.out.println(“FAILURE”);
- status = -1;
- return;
- }
- /**
- * Parse user supplied arguments.
- *
- * @param args
- */
- private static void parseArgs(String[] args) {
- try {
- int length = args.length;
- if (length == 0) {
- throw new IllegalArgumentException(“No arguments! Mandatory arguments must be specified.”);
- }
- if ((length % 2) != 0) {
- throw new IllegalArgumentException(“Incorrect number of arguments!”);
- }
- int i = 0;
- while (i < length) {
- if ((args[i]).charAt(0) != ‘-‘) {
- throw new IllegalArgumentException(“Expected a ‘-‘ character next: “ + args[i]);
- }
- char opt = (args[i]).toLowerCase().charAt(1);
- switch (opt) {
- case ‘h’ :
- host = args[++i];
- break;
- case ‘p’ :
- port = Integer.parseInt(args[++i]);
- break;
- case ‘l’ :
- channel = args[++i];
- break;
- case ‘m’ :
- queueManagerName = args[++i];
- break;
- case ‘d’ :
- destinationName = args[++i];
- break;
- default : {
- throw new IllegalArgumentException(“Unknown argument: “ + opt);
- }
- }
- ++i;
- }
- if (queueManagerName == null) {
- throw new IllegalArgumentException(“A queueManager name must be specified.”);
- }
- if (destinationName == null) {
- throw new IllegalArgumentException(“A destination name must be specified.”);
- }
- // Whether the destination is a queue or a topic. Apply a simple check.
- if (destinationName.startsWith(“topic://“)) {
- isTopic = true;
- }
- else {
- // Otherwise, let’s assume it is a queue.
- isTopic = false;
- }
- }
- catch (Exception e) {
- System.out.println(e.getMessage());
- printUsage();
- System.exit(-1);
- }
- return;
- }
- /**
- * Display usage help.
- */
- private static void printUsage() {
- System.out.println(“\nUsage:”);
- System.out
- .println(“JmsConsumer -m queueManagerName -d destinationName [-h host -p port -l channel]“);
- return;
- }
- } // end class
生产者需要由session创建message producer,消费者需要由session创建message consumer。
JMSBrowser和JMSCustomer的区别在于前者只能浏览消息,后者是消费消息。前者能够浏览所有的消息,而后者一次消费一条消息。
Java代码
- package test;
- // SCCSID “@(#) MQMBID sn=p000-L120604 su=_H-IvIK4nEeGko6IWl3MDhA pn=MQJavaSamples/jms/JmsBrowser.java”
- /*
- * <copyright
- * notice=”lm-source-program”
- * pids=”5724-H72,5655-R36,5655-L82,5724-L26,”
- * years=”2008,2012”
- * crc=”3912865343” >
- * Licensed Materials - Property of IBM
- *
- * 5724-H72,5655-R36,5655-L82,5724-L26,
- *
- * (C) Copyright IBM Corp. 2008, 2012 All Rights Reserved.
- *
- * US Government Users Restricted Rights - Use, duplication or
- * disclosure restricted by GSA ADP Schedule Contract with
- * IBM Corp.
- *
- */
- import java.util.Enumeration;
- import javax.jms.Connection;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.Queue;
- import javax.jms.QueueBrowser;
- import javax.jms.Session;
- import com.ibm.msg.client.jms.JmsConnectionFactory;
- import com.ibm.msg.client.jms.JmsFactoryFactory;
- import com.ibm.msg.client.wmq.WMQConstants;
- /**
- * A JMS queue browser application that looks at all available messages on the named queue, without
- * removing them, in the order they would be received by a consumer application.
- *
- * Tip: A browser is not applicable for topics.
- *
- * Notes:
- *
- * API type: IBM JMS API (v1.1, unified domain)
- *
- * Messaging domain: Point-to-point
- *
- * Provider type: WebSphere MQ
- *
- * Connection mode: Client connection
- *
- * JNDI in use: No
- *
- * Usage:
- *
- * JmsBrowser -m queueManagerName -d queueName [-h host -p port -l channel]
- *
- * for example:
- *
- * JmsBrowser -m QM1 -d Q1
- *
- * JmsBrowser -m QM1 -d Q1 -h localhost -p 1414
- */
- public class JmsBrowser {
- private static String host = “localhost”;
- private static int port = 1414;
- private static String channel = “SYSTEM.DEF.SVRCONN”;
- private static String queueManagerName = null;
- private static String queueName = null;
- // System exit status value (assume unset value to be 1)
- private static int status = 1;
- /**
- * Main method
- *
- * @param args
- */
- public static void main(String[] args) {
- // Parse the arguments
- args = new String[]{ “-m”,”QMTest”, “-d”,”testQueue”};
- parseArgs(args);
- // Variables
- Connection connection = null;
- Session session = null;
- Queue destination = null;
- QueueBrowser browser = null;
- try {
- // Create a connection factory
- JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
- JmsConnectionFactory cf = ff.createConnectionFactory();
- // Set the properties
- cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, host);
- cf.setIntProperty(WMQConstants.WMQ_PORT, port);
- cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel);
- cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
- cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManagerName);
- // Create JMS objects
- connection = cf.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = session.createQueue(queueName);
- browser = session.createBrowser(destination);
- // Start the connection
- connection.start();
- // And, browse the message
- //浏览多条消息,并放到枚举类型里
- Enumeration messages = browser.getEnumeration();
- int count = 0;
- Message current;
- System.out.println(“Browse starts”);
- while (messages.hasMoreElements()) {
- current = (Message) messages.nextElement();
- System.out.println(“\nMessage “ + ++count + “:\n”);
- System.out.println(current);
- }
- System.out.println(“\nNo more messages\n”);
- recordSuccess();
- }
- catch (JMSException jmsex) {
- recordFailure(jmsex);
- }
- finally {
- if (browser != null) {
- try {
- browser.close();
- }
- catch (JMSException jmsex) {
- System.out.println(“Browser could not be closed.”);
- recordFailure(jmsex);
- }
- }
- if (session != null) {
- try {
- session.close();
- }
- catch (JMSException jmsex) {
- System.out.println(“Session could not be closed.”);
- recordFailure(jmsex);
- }
- }
- if (connection != null) {
- try {
- connection.close();
- }
- catch (JMSException jmsex) {
- System.out.println(“Connection could not be closed.”);
- recordFailure(jmsex);
- }
- }
- }
- System.exit(status);
- return;
- } // end main()
- /**
- * Process a JMSException and any associated inner exceptions.
- *
- * @param jmsex
- */
- private static void processJMSException(JMSException jmsex) {
- System.out.println(jmsex);
- Throwable innerException = jmsex.getLinkedException();
- if (innerException != null) {
- System.out.println(“Inner exception(s):”);
- }
- while (innerException != null) {
- System.out.println(innerException);
- innerException = innerException.getCause();
- }
- return;
- }
- /**
- * Record this run as successful.
- */
- private static void recordSuccess() {
- System.out.println(“SUCCESS”);
- status = 0;
- return;
- }
- /**
- * Record this run as failure.
- *
- * @param ex
- */
- private static void recordFailure(Exception ex) {
- if (ex != null) {
- if (ex instanceof JMSException) {
- processJMSException((JMSException) ex);
- }
- else {
- System.out.println(ex);
- }
- }
- System.out.println(“FAILURE”);
- status = -1;
- return;
- }
- /**
- * Parse user supplied arguments.
- *
- * @param args
- */
- private static void parseArgs(String[] args) {
- try {
- int length = args.length;
- if (length == 0) {
- throw new IllegalArgumentException(“No arguments! Mandatory arguments must be specified.”);
- }
- if ((length % 2) != 0) {
- throw new IllegalArgumentException(“Incorrect number of arguments!”);
- }
- int i = 0;
- while (i < length) {
- if ((args[i]).charAt(0) != ‘-‘) {
- throw new IllegalArgumentException(“Expected a ‘-‘ character next: “ + args[i]);
- }
- char opt = (args[i]).toLowerCase().charAt(1);
- switch (opt) {
- case ‘h’ :
- host = args[++i];
- break;
- case ‘p’ :
- port = Integer.parseInt(args[++i]);
- break;
- case ‘l’ :
- channel = args[++i];
- break;
- case ‘m’ :
- queueManagerName = args[++i];
- break;
- case ‘d’ :
- queueName = args[++i];
- break;
- default : {
- throw new IllegalArgumentException(“Unknown argument: “ + opt);
- }
- }
- ++i;
- }
- if (queueManagerName == null) {
- throw new IllegalArgumentException(“A queueManager name must be specified.”);
- }
- if (queueName == null) {
- throw new IllegalArgumentException(“A queue name must be specified.”);
- }
- }
- catch (Exception e) {
- System.out.println(e.getMessage());
- printUsage();
- System.exit(-1);
- }
- return;
- }
- /**
- * Display usage help.
- */
- private static void printUsage() {
- System.out.println(“\nUsage:”);
- System.out.println(“JmsBrowser -m queueManagerName -d queueName [-h host -p port -l channel]“);
- return;
- }
- } // end class
运行之前当然需要先创建好IBM MQ的服务端,并且创建好队列管理器和队列。
原文地址:http://qiaokeli.iteye.com/blog/1776186
还没有评论,来说两句吧...