activeMQ入门demo
首先从官网下载http://activemq.apache.org/download-archives.html
解压后执行bin下面的activemq.bat即可启动
新建maven项目,导入jar包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.14.5</version>
</dependency>
activemq主要有以下几个成员:
- ConnectionFactory: 连接工厂,封装好的连接池
- Connection: 连接
- Session:
- Destination:消息目的地
- MessageProduce:消息产生者
- MessageConsumer:消费者
上代码,以下是生产者:
public class Sender {
private final static int num = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory;// 连接工厂
Connection connection = null;// 连接
Session session;// 消息进程
Destination destination;// 消息目的地
MessageProducer produce;// 消息发送者
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");// 建立连接工厂
try {
connection = connectionFactory.createConnection();// 从连接工厂获取连接
connection.start();//
/*AUTO_ACKNOWLEDGE:当客户端从 receive 或 onMessage成功返回时,Session 自动签收客户端的这条消息的收条 *CLIENT_ACKNOWLEDGE:需要调用acknowledge方法来接收 */
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("hello");// 设置消息队列,发送者跟消费者要相同才能相互传递消息
produce = session.createProducer(destination);
produce.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 发送业务消息
sendMessage(session, produce, "test");
session.commit();
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
private static void sendMessage(Session session, MessageProducer produce, String name) throws JMSException {
TextMessage message = session.createTextMessage("i am message number" + name);
produce.send(message);
}
}
消费者:
public class Receiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageConsumer consumer;//消费者
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);//设置为true即消费消息后不清除
destination = session.createQueue("hello");
consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive(30*1000);//设置消费者消费时间
if (null != message) {
// message.acknowledge();
System.out.println("shoudao "+message.getText());
} else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
还没有评论,来说两句吧...