jms基础入门(activemq)
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
1.ActiveMQ安装与使用
点我下载
下载后,进入目录apache-activemq-5.14.0\bin\win32|64
当然,还可以通过命令开启,
- 普通启动 ./activemq start
- 启动并指定日志文件 ./activemq start >tmp/smlog
- 后台启动方式nohup ./activemq start >/tmp/smlog
运行成功后,打开网页,访问http://localhost:8161/admin/
第一次登陆,会要求输入密码 , 默认账号admin 默认密码admin
以上是登陆成功的页面,不同版本可能页面不同,但功能能用就证明登陆成功,
java程序演示
创建一个java项目,导入jar包,
创建一个会话创建者,
package cn.itcast.demo;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSProducer {
// 默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
// 默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
// 默认连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
// 发送的消息数量
private static final int SENDNUM = 10;
public static void main(String[] args) {
// 连接工厂
ConnectionFactory connectionFactory;
// 连接
Connection connection = null;
// 会话 接受或者发送消息的线程
Session session;
// 消息的目的地
Destination destination;
// 消息生产者
MessageProducer messageProducer;
// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD,
JMSProducer.BROKEURL);
try {
// 通过连接工厂获取连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建一个名称为HelloWorld的消息队列
destination = session.createQueue("HelloWorld");
// 创建消息生产者
messageProducer = session.createProducer(destination);
// 发送消息
sendMessage(session, messageProducer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * 消息生产者 * @throws Exception */
public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
for (int i = 0; i < JMSProducer.SENDNUM; i++) {
// 创建一条文本消息
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
System.out.println("发送消息:Activemq 发送消息" + i);
// 通过消息生产者发出消息
messageProducer.send(message);
}
}
}
运行如上代码测试
然后我们进入管理页面看看
我们创建一个消费者,
package cn.itcast.demo;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认连接密码
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;// 连接工厂
Connection connection = null;// 连接
Session session;// 会话 接受或者发送消息的线程
Destination destination;// 消息的目的地
MessageConsumer messageConsumer;// 消息的消费者
// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
JMSConsumer.BROKEURL);
try {
// 通过连接工厂获取连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个连接HelloWorld的消息队列
destination = session.createQueue("HelloWorld");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if (textMessage != null) {
System.out.println("收到的消息:" + textMessage.getText());
} else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
运行结果如下
然后我们再进入到管理页面
其实这些代码没有什么技术量,,复制粘贴就行,,唯一需要改的就是消息队列名了,注意一下就行,
maven整合spring+jms(activemq)
applicationContext.xml(模板如下)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">
<!-- 扫盲包 -->
<context:component-scan base-package="cn.itcast.activemq" />
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>
这也是二选一
<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>
<!--Spring JmsTemplate 的消息生产者 end-->
<!-- 消息消费者 start-->
<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<!--消息队列名称 消费者类名,首字母小写,需要加上注解才行 -->
<jms:listener destination="test.queue" ref="queueReceiver1"/>
</jms:listener-container>
俩种监听方式,一个是一对一,一个是一对多,区别我就不多说了,不会自行百度,通常开发都是二选一,自己选择合适的
<!-- 定义Topic监听器 -->
<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.topic" ref="topicReceiver1"/>
<jms:listener destination="test.topic" ref="topicReceiver2"/>
</jms:listener-container>
<!-- 消息消费者 end -->
</beans>
pom依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<!-- <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.2</version> </dependency> -->
</dependencies>
好了,我们开始撸代码(做一个注册发短信提示激活功能)
首先写消费者
0.创建一个maven工程
1.复制原先准备好的pom文件
2.编写web.xml
<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5">
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<!-- 指定spring配置文件的位置,文件放在resources下 -->
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext.xml</param-value>
</context-param>
</web-app>
3.配置applicationContext.xml文件(放在resources目录下)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">
<!-- 扫盲包 -->
<context:component-scan base-package="cn.itcast.activemq" />
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 消息消费者 start-->
<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="sms.queue" ref="registerQueue"/>
</jms:listener-container>
<!-- 消息消费者 end -->
</beans>
编写消费者类
需要给applicationContext的包扫描扫到,
package cn.itcast.activemq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
/** * 发送短信系统, */
@Component
public class RegisterQueue implements MessageListener {
private Logger log = LoggerFactory.getLogger(RegisterQueue.class);
public void onMessage(Message message) {
//接收map类型数据
MapMessage map = (MapMessage)message;
try {
String telephone = map.getString("telephone"); //接收电话
String content = map.getString("content") ; //这是生产者存入的字段名
log.info("telephone:"+telephone);
log.info("content:"+content);
log.info("模拟发送短信");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
最后我们需要注意的是,因为是一个单独的系统,所以我们需要配置一个tomcat,在pom文件里面
<plugins>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<version>2.2</version>
<configuration>
<uriEncoding>UTF-8</uriEncoding>
<path>/bos_sms</path>
<port>8892</port>
</configuration>
</plugin>
</plugins>
消费者已经写完了,然后我们开始写生产者(客户端),
由于客户端涉及相关业务,不能发完整代码,体谅一下
1.创建一个maven工程,
2.配置web.xml文件,
<-- 配置web.xml -->
<!--struts2拦截器-->
<filter>
<filter-name>struts2</filter-name>
<filter-class>org.apache.struts2.dispatcher.ng.filter.StrutsPrepareAndExecuteFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>struts2</filter-name>
<url-pattern>*</url-pattern>
</filter-mapping>
<!--spring初始化配置-->
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext.xml</param-value>
</context-param>
<!-- 配置spring框架的监听器 -->
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
配置applicationContext.xml
<!--把这下面的东西拷贝进去修改修改就行了,-->
<!-- 扫盲包 -->
<context:component-scan base-package="cn.itcast.activemq" />
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>
创建一个生产者
@Resource
private JmsTemplate template;
@Action(value = "customerAction_regist",results = {
@Result(name = "fail",location = "/signup-fail.html",type = "redirect"),
@Result(name = "success",location = "/signup-success.html",type = "redirect")})
public String regist(){
String sms_name = "sms.queue";//消息队列明,需要跟我们先前设置消费者的一样
final String content = "恭喜你,你的账号已注册,请登陆邮箱激活!";//消息内容
//创建一个消息匿名类
template.send(sms_name, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();//获取map的消息对象
mapMessage.setString("telephone",customer.getTelephone());
mapMessage.setString("content",content);
return mapMessage;
}
});
最后还需要配置一个tomcat ,
<plugins>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<version>2.2</version>
<configuration>
<uriEncoding>UTF-8</uriEncoding>
<path>/bos_before</path>
<port>8890</port>
</configuration>
</plugin>
</plugins>
最后启动运行试试,需要注意的是,俩个项目都要启动,
sms服务启动成功,接下来我们启动另外一个服务,
也启动成功,我们开始代码测试
测试抛出异常
Exception occurred during processing request: null
java.lang.NullPointerException
根据提示信息发现 , JmsTemplate template;注入失败,
然后一顿乱找,发现是切面的影响的,,这个有点坑
把这行代码注释掉就没问题了
<!--使用切面注解扫描-->
<aop:aspectj-autoproxy proxy-target-class="true"/>
原因是代理类不能使用JmsTemplate,
最后运行是没问题的,
jms就讲到这里,提前祝大家圣诞节快乐
还没有评论,来说两句吧...