ActiveMq 使用例子

偏执的太偏执、 2021-09-26 09:40 514阅读 0赞

ActiveMq 下载地址
https://gitee.com/Kkk9527/Third-party-plug-ins/raw/master/software/activemq/apache-activemq-5.11.1-bin.zip
1,进入apache-activemq-5.11.1\bin\win64 启动Mq activemq.bat
2,启动成功后,再本地能打开 http://localhost:8161/admin/ 就是启动成功了!默认账号密码 admin admin
3,java连接mq的jar只需要一个activemq-all.jar 下载地址
https://gitee.com/Kkk9527/Third-party-plug-ins/raw/master/software/jar/activemq-all.jar
4,mq接收端的线程池 接收端需要线程池 发送端可以不用

  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.LinkedBlockingDeque;
  4. import java.util.concurrent.ThreadPoolExecutor;
  5. import java.util.concurrent.TimeUnit;
  6. public class TestThreadPool {
  7. /** * 线程池 * * @param args */
  8. public static void threadReceiver() {
  9. ArrayBlockingQueue<Runnable> arrayWorkQueue = new ArrayBlockingQueue(10);
  10. LinkedBlockingDeque<Runnable> linkedWorkQueue = new LinkedBlockingDeque();
  11. int count = 20;
  12. ExecutorService threadPool = new ThreadPoolExecutor(5, // corePoolSize线程池中核心线程数
  13. 99, // maximumPoolSize 线程池中最大线程数
  14. 60, // 线程池中线程的最大空闲时间,超过这个时间空闲线程将被回收
  15. TimeUnit.SECONDS, // 时间单位
  16. // 下面是采用有界队列和无界队列的区别
  17. arrayWorkQueue,
  18. // linkedWorkQueue,
  19. // 下面是jdk的四种执行策略
  20. // new ThreadPoolExecutor.AbortPolicy() 这种策略直接抛出异常,丢弃任务。
  21. // new ThreadPoolExecutor.DiscardPolicy()
  22. // 这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
  23. // new ThreadPoolExecutor.CallerRunsPolicy() //线程调用运行该任务的
  24. // execute
  25. // 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。没看明白,当时是我的main线程执行的task5
  26. new ThreadPoolExecutor.DiscardOldestPolicy()// 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
  27. );
  28. Thread thread = new Thread(new threadReceiver());
  29. threadPool.execute(thread);
  30. System.out.println("-----------------------------------");
  31. threadPool.shutdown();
  32. System.out.println("***********************************");
  33. }
  34. }

5,mq接收端代码

  1. import javax.jms.Connection;
  2. import javax.jms.ConnectionFactory;
  3. import javax.jms.Destination;
  4. import javax.jms.MessageConsumer;
  5. import javax.jms.Session;
  6. import javax.jms.TextMessage;
  7. import org.apache.activemq.ActiveMQConnection;
  8. import org.apache.activemq.ActiveMQConnectionFactory;
  9. import com.meididi.utils.OpeFunction;
  10. /** * MQ接收端 * @author root * */
  11. public class threadReceiver implements Runnable {
  12. // Connection :JMS 客户端到JMS Provider 的连接
  13. public static Connection connection = null;
  14. @Override
  15. public void run() {
  16. // ConnectionFactory :连接工厂,JMS 用它创建连接
  17. ConnectionFactory connectionFactory;
  18. // Session: 一个发送或接收消息的线程
  19. Session session;
  20. // Destination :消息的目的地;消息发送给谁.
  21. Destination destination;
  22. // 消费者,消息接收者
  23. MessageConsumer consumer;
  24. connectionFactory = new ActiveMQConnectionFactory(
  25. ActiveMQConnection.DEFAULT_USER,
  26. ActiveMQConnection.DEFAULT_PASSWORD,
  27. "tcp://localhost:61616");
  28. try {
  29. // 构造从工厂得到连接对象
  30. connection = connectionFactory.createConnection();
  31. // 启动
  32. connection.start();
  33. // 获取操作连接
  34. session = connection.createSession(Boolean.FALSE,
  35. Session.AUTO_ACKNOWLEDGE);
  36. // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
  37. destination = session.createQueue("FirstQueue");
  38. consumer = session.createConsumer(destination);
  39. System.out.println("mq接收功能启动成功***********************");
  40. while (true) {
  41. //设置接收者接收消息的时间,为了便于测试,这里谁定为100s
  42. TextMessage message = (TextMessage) consumer.receive(1000*100);
  43. if (null != message) {
  44. System.out.println("---------------"+OpeFunction.getNowTime()+":收到消息:" + message.getText());
  45. } else {
  46. break;
  47. }
  48. }
  49. } catch (Exception e) {
  50. e.printStackTrace();
  51. } finally {
  52. try {
  53. if (null != connection)
  54. connection.close();
  55. } catch (Throwable ignore) {
  56. }
  57. }
  58. }
  59. }

6,mq发送端代码

  1. import javax.jms.Connection;
  2. import javax.jms.ConnectionFactory;
  3. import javax.jms.DeliveryMode;
  4. import javax.jms.Destination;
  5. import javax.jms.MessageProducer;
  6. import javax.jms.Session;
  7. import javax.jms.TextMessage;
  8. import org.apache.activemq.ActiveMQConnection;
  9. import org.apache.activemq.ActiveMQConnectionFactory;
  10. import com.meididi.utils.OpeFunction;
  11. /** * MQ发送端 * @author root * */
  12. public class Sender_text {
  13. private static final int SEND_NUMBER = 5;
  14. // MessageProducer:消息发送者
  15. private static MessageProducer producer;
  16. // Session: 一个发送或接收消息的线程
  17. private static Session session;
  18. public static Connection connection = null;
  19. public static void getFactory() {
  20. // ConnectionFactory :连接工厂,JMS 用它创建连接
  21. ConnectionFactory connectionFactory;
  22. // Connection :JMS 客户端到JMS Provider 的连接
  23. // Destination :消息的目的地;消息发送给谁.
  24. Destination destination;
  25. // TextMessage message;
  26. // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
  27. connectionFactory = new ActiveMQConnectionFactory(
  28. ActiveMQConnection.DEFAULT_USER,
  29. ActiveMQConnection.DEFAULT_PASSWORD,
  30. "tcp://localhost:61616");
  31. try {
  32. // 构造从工厂得到连接对象
  33. connection = connectionFactory.createConnection();
  34. // 启动
  35. connection.start();
  36. // 获取操作连接
  37. session = connection.createSession(Boolean.TRUE,
  38. Session.AUTO_ACKNOWLEDGE);
  39. // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
  40. destination = session.createQueue("FirstQueue");
  41. // 得到消息生成者【发送者】
  42. producer = session.createProducer(destination);
  43. // 设置不持久化,此处学习,实际根据项目决定
  44. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  45. // 构造消息,此处写死,项目就是参数,或者方法获取
  46. // sendMessage("ok");
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. } finally {
  50. // try {
  51. // if (null != connection)
  52. // connection.close();
  53. // } catch (Throwable ignore) {
  54. // }
  55. }
  56. System.out.println("mq发送功能启动成功-----------------------");
  57. }
  58. /** * ActiveMq 发送方法 * @param session * @param producer * @param str * @throws Exception */
  59. public static void sendMessage(String str)
  60. throws Exception {
  61. TextMessage message = session
  62. .createTextMessage(str);
  63. // 发送消息到目的地方
  64. System.out.println(OpeFunction.getNowTime()+":发送消息:" + str);
  65. producer.send(message);
  66. session.commit();
  67. }
  68. }

发表评论

表情:
评论列表 (有 0 条评论,514人围观)

还没有评论,来说两句吧...

相关阅读