ActiveMq 使用例子
ActiveMq 下载地址
https://gitee.com/Kkk9527/Third-party-plug-ins/raw/master/software/activemq/apache-activemq-5.11.1-bin.zip
1,进入apache-activemq-5.11.1\bin\win64 启动Mq activemq.bat
2,启动成功后,再本地能打开 http://localhost:8161/admin/ 就是启动成功了!默认账号密码 admin admin
3,java连接mq的jar只需要一个activemq-all.jar 下载地址
https://gitee.com/Kkk9527/Third-party-plug-ins/raw/master/software/jar/activemq-all.jar
4,mq接收端的线程池 接收端需要线程池 发送端可以不用
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThreadPool {
/** * 线程池 * * @param args */
public static void threadReceiver() {
ArrayBlockingQueue<Runnable> arrayWorkQueue = new ArrayBlockingQueue(10);
LinkedBlockingDeque<Runnable> linkedWorkQueue = new LinkedBlockingDeque();
int count = 20;
ExecutorService threadPool = new ThreadPoolExecutor(5, // corePoolSize线程池中核心线程数
99, // maximumPoolSize 线程池中最大线程数
60, // 线程池中线程的最大空闲时间,超过这个时间空闲线程将被回收
TimeUnit.SECONDS, // 时间单位
// 下面是采用有界队列和无界队列的区别
arrayWorkQueue,
// linkedWorkQueue,
// 下面是jdk的四种执行策略
// new ThreadPoolExecutor.AbortPolicy() 这种策略直接抛出异常,丢弃任务。
// new ThreadPoolExecutor.DiscardPolicy()
// 这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
// new ThreadPoolExecutor.CallerRunsPolicy() //线程调用运行该任务的
// execute
// 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。没看明白,当时是我的main线程执行的task5
new ThreadPoolExecutor.DiscardOldestPolicy()// 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
);
Thread thread = new Thread(new threadReceiver());
threadPool.execute(thread);
System.out.println("-----------------------------------");
threadPool.shutdown();
System.out.println("***********************************");
}
}
5,mq接收端代码
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.meididi.utils.OpeFunction;
/** * MQ接收端 * @author root * */
public class threadReceiver implements Runnable {
// Connection :JMS 客户端到JMS Provider 的连接
public static Connection connection = null;
@Override
public void run() {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// 消费者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
System.out.println("mq接收功能启动成功***********************");
while (true) {
//设置接收者接收消息的时间,为了便于测试,这里谁定为100s
TextMessage message = (TextMessage) consumer.receive(1000*100);
if (null != message) {
System.out.println("---------------"+OpeFunction.getNowTime()+":收到消息:" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
6,mq发送端代码
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.meididi.utils.OpeFunction;
/** * MQ发送端 * @author root * */
public class Sender_text {
private static final int SEND_NUMBER = 5;
// MessageProducer:消息发送者
private static MessageProducer producer;
// Session: 一个发送或接收消息的线程
private static Session session;
public static Connection connection = null;
public static void getFactory() {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// TextMessage message;
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,此处学习,实际根据项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 构造消息,此处写死,项目就是参数,或者方法获取
// sendMessage("ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
// try {
// if (null != connection)
// connection.close();
// } catch (Throwable ignore) {
// }
}
System.out.println("mq发送功能启动成功-----------------------");
}
/** * ActiveMq 发送方法 * @param session * @param producer * @param str * @throws Exception */
public static void sendMessage(String str)
throws Exception {
TextMessage message = session
.createTextMessage(str);
// 发送消息到目的地方
System.out.println(OpeFunction.getNowTime()+":发送消息:" + str);
producer.send(message);
session.commit();
}
}
还没有评论,来说两句吧...