RabbitMQ入门到实战(1)-Hello RabbitMQ

清疚 2022-05-27 03:16 288阅读 0赞

转载于:http://www.cnblogs.com/4----/p/6518801.html

1.简介

本篇博文介绍了在windows平台下安装RabbitMQ Server端,并用JAVA代码实现收发消息

2.安装RabbitMQ

  1. RabbitMQ是用Erlang开发的,所以需要先安装Erlang环境,在这里下载对应系统的Erlang安装包进行安装
  2. 点击这里下载对应平台的RabbitMQ安装包进行安装

Windows平台安装完成后如图

1120662-20170308174944703-1548066211.png

3.启用RabbitMQ Web控制台

RabbitMQ提供一个控制台,用于管理和监控RabbitMQ,默认是不启动的,需要运行以下命令进行启动

  1. 点击上图的Rabbit Command Prompt,打开rabbitMQ控制台
  2. 在官方介绍管理控制台的页面,可以看到,输入以下命令启动后台控制插件

    rabbitmq-plugins enable rabbitmq_management

  3. 登录后台页面:http://localhost:15672/ 密码和用户名都是 guest ,界面如下

1120662-20170308175012906-706376283.png

目前可以先不用理会此界面,后面使用到时会详细介绍,也可以到这里查看官方文档。

4.编写MessageSender

Spring对RabbitMQ已经进行了封装,正常使用中,会使用Spring集成,第一个项目中,我们先不考虑那么多

在IDE中新建一个Maven项目,并在pom.xml中贴入如下依赖,RabbitMQ的最新版本依赖可以在这里找到

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>4.1.0</version>
  5. </dependency>

等待Maven下载完成后,就可以在Maven Dependencies中看到RabbitMQ的JAR

1120662-20170308175028656-1650232142.png

在这里,我们发现,RabbitMQ的日志依赖了slf4j-api这个包,slf4j-api并不是一个日志实现,这样子是打不出日志的,所以,我们给pom加上一个日志实现,这里用了logback

  1. <dependency>
  2. <groupId>ch.qos.logback</groupId>
  3. <artifactId>logback-classic</artifactId>
  4. <version>1.2.1</version>
  5. </dependency>

之后maven依赖如下,可以放心写代码了

1120662-20170308175057844-316719986.png

新建一个MessageSender类,代码如下

复制代码

  1. 1 import java.io.IOException;
  2. 2 import java.util.concurrent.TimeoutException;
  3. 3
  4. 4 import org.slf4j.Logger;
  5. 5 import org.slf4j.LoggerFactory;
  6. 6
  7. 7 import com.rabbitmq.client.Channel;
  8. 8 import com.rabbitmq.client.Connection;
  9. 9 import com.rabbitmq.client.ConnectionFactory;
  10. 10
  11. 11 public class MessageSender {
  12. 12
  13. 13 private Logger logger = LoggerFactory.getLogger(MessageSender.class);
  14. 14
  15. 15 //声明一个队列名字
  16. 16 private final static String QUEUE_NAME = "hello";
  17. 17
  18. 18 public boolean sendMessage(String message){
  19. 19 //new一个RabbitMQ的连接工厂
  20. 20 ConnectionFactory factory = new ConnectionFactory();
  21. 21 //设置需要连接的RabbitMQ地址,这里指向本机
  22. 22 factory.setHost("127.0.0.1");
  23. 23 Connection connection = null;
  24. 24 Channel channel = null;
  25. 25 try {
  26. 26 //尝试获取一个连接
  27. 27 connection = factory.newConnection();
  28. 28 //尝试创建一个channel
  29. 29 channel = connection.createChannel();
  30. 30 //这里的参数在后面详解
  31. 31 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  32. 32 //注意这里调用了getBytes(),发送的其实是byte数组,接收方收到消息后,需要重新组装成String
  33. 33 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  34. 34 logger.info("Sent '" + message + "'");
  35. 35 //关闭channel和连接
  36. 36 channel.close();
  37. 37 connection.close();
  38. 38 } catch (IOException | TimeoutException e) {
  39. 39 //失败后记录日志,返回false,代表发送失败
  40. 40 logger.error("send message faild!",e);
  41. 41 return false;
  42. 42 }
  43. 43 return true;
  44. 44 }
  45. 45 }

复制代码

然后在App类的main方法中调用sendMessage

  1. 1 public class App {
  2. 2 public static void main( String[] args ){
  3. 3 MessageSender sender = new MessageSender();
  4. 4 sender.sendMessage("hello RabbitMQ!");
  5. 5 }
  6. 6 }

打印日志如下

1120662-20170308175116219-326797014.png

打开RabbitMQ的控制台,可以看到消息已经进到了RabbitMQ中

1120662-20170308175128984-2070017038.png

点进去,用控制台自带的getMessage功能,可以看到消息已经成功由RabbitMQ管理了

1120662-20170308175149531-331735340.png

至此,MessageSender已经写好了,在该类的31和33行,我们分别调用了队列声明和消息发送

  1. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  2. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

queueDeclare,有很多参数,我们可以看一下他的源码,注释上有详细的解释,我简单翻译了一下

复制代码

  1. 1 /**
  2. 2 * Declare a queue 声明一个队列
  3. 3 * @see com.rabbitmq.client.AMQP.Queue.Declare
  4. 4 * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
  5. 5 * @param queue the name of the queue队列的名字
  6. 6 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)是否持久化,为true则在rabbitMQ重启后生存
  7. 7 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)是否是排他性队列(别人看不到),只对当前连接有效,当前连接断开后,队列删除(设置了持久化也删除)
  8. 8 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)自动删除,在最后一个连接断开后删除队列
  9. 9 * @param arguments other properties (construction arguments) for the queue 其他参数
  10. 10 * @return a declaration-confirm method to indicate the queue was successfully declared
  11. 11 * @throws java.io.IOException if an error is encountered
  12. 12 */
  13. 13 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
  14. 14 Map<String, Object> arguments) throws IOException;

复制代码

前面4个都非常好理解,最后一个“其他参数”,到底是什么其他参数,这个东西真的很难找,用到再解释吧,官方文档如下

  • TTL Time To Live 存活时间

  • Dead Lettering 遗言,当消息死亡时,做些什么

  • Length Limit 长度限制

  • Priority Queues 优先级

basicPublish的翻译如下

复制代码

  1. 1 /**
  2. 2 * Publish a message.发送一条消息
  3. 3 *
  4. 4 * Publishing to a non-existent exchange will result in a channel-level
  5. 5 * protocol exception, which closes the channel.
  6. 6 *
  7. 7 * Invocations of <code>Channel#basicPublish</code> will eventually block if a
  8. 8 * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
  9. 9 *
  10. 10 * @see com.rabbitmq.client.AMQP.Basic.Publish
  11. 11 * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
  12. 12 * @param exchange the exchange to publish the message to 交换模式,会在后面讲,官方文档在这里 13 * @param routingKey the routing key 控制消息发送到哪个队列
  13. 14 * @param props other properties for the message - routing headers etc 其他参数
  14. 15 * @param body the message body 消息,byte数组
  15. 16 * @throws java.io.IOException if an error is encountered
  16. 17 */
  17. 18 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

复制代码

这里又有个其他参数,它的类型是这样的,设置消息的一些详细属性

1120662-20170308174332594-746819850.png

5.编写MessageConsumer

为了和Sender区分开,新建一个Maven项目MessageConsumer

复制代码

  1. 1 package com.liyang.ticktock.rabbitmq;
  2. 2
  3. 3 import java.io.IOException;
  4. 4 import java.util.concurrent.TimeoutException;
  5. 5
  6. 6 import org.slf4j.Logger;
  7. 7 import org.slf4j.LoggerFactory;
  8. 8
  9. 9 import com.rabbitmq.client.AMQP;
  10. 10 import com.rabbitmq.client.Channel;
  11. 11 import com.rabbitmq.client.Connection;
  12. 12 import com.rabbitmq.client.ConnectionFactory;
  13. 13 import com.rabbitmq.client.Consumer;
  14. 14 import com.rabbitmq.client.DefaultConsumer;
  15. 15 import com.rabbitmq.client.Envelope;
  16. 16
  17. 17 public class MessageConsumer {
  18. 18
  19. 19 private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
  20. 20
  21. 21 public boolean consume(String queueName){
  22. 22 //连接RabbitMQ
  23. 23 ConnectionFactory factory = new ConnectionFactory();
  24. 24 factory.setHost("127.0.0.1");
  25. 25 Connection connection = null;
  26. 26 Channel channel = null;
  27. 27 try {
  28. 28 connection = factory.newConnection();
  29. 29 channel = connection.createChannel();
  30. 30 //这里声明queue是为了取消息的时候,queue肯定会存在
  31. 31 //注意,queueDeclare是幂等的,也就是说,消费者和生产者,不论谁先声明,都只会有一个queue
  32. 32 channel.queueDeclare(queueName, false, false, false, null);
  33. 33
  34. 34 //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
  35. 35 Consumer consumer = new DefaultConsumer(channel){
  36. 36 @Override
  37. 37 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
  38. 38 throws IOException {
  39. 39 String message = new String(body, "UTF-8");
  40. 40 logger.info("Received '" + message + "'");
  41. 41 }
  42. 42 };
  43. 43 //上面是声明消费者,这里用声明的消费者消费掉队列中的消息
  44. 44 channel.basicConsume(queueName, true, consumer);
  45. 45
  46. 46 //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
  47. 47
  48. 48 } catch (IOException | TimeoutException e) {
  49. 49 //失败后记录日志,返回false,代表消费失败
  50. 50 logger.error("send message faild!",e);
  51. 51 return false;
  52. 52 }
  53. 53
  54. 54
  55. 55 return true;
  56. 56 }
  57. 57 }

复制代码

然后在App的main方法中调用Cunsumer进行消费

复制代码

  1. 1 public class App
  2. 2 {
  3. 3 //这个队列名字要和生产者中的名字一样,否则找不到队列
  4. 4 private final static String QUEUE_NAME = "hello";
  5. 5
  6. 6 public static void main( String[] args )
  7. 7 {
  8. 8 MessageConsumer consumer = new MessageConsumer();
  9. 9 consumer.consume(QUEUE_NAME);
  10. 10 }
  11. 11 }

复制代码

结果如下,消费者一直在等待消息,每次有消息进来,就会立刻消费掉

1120662-20170308175234484-92505214.png

6.多个消费者同时消费一个队列

改造一下Consumer

1120662-20170308175253828-1566810348.png

在App中new多个消费者

1120662-20170308175307688-1077016419.png

改造Sender,使它不停的往RabbitMQ中发送消息

1120662-20170308175334000-316933034.png

启动Sender

1120662-20170308175353734-688735600.png

启动Consumer,发现消息很平均的发给四个客户端,一人一个,谁也不插队

1120662-20170308175418734-1623270831.png

如果我们把速度加快呢?把Sender的休息时间去掉,发现消费开始变得没有规律了,其实呢,它还是有规律的,这个是RabbitMQ的特性,称作“Round-robin dispatching”,消息会平均的发送给每一个消费者,可以看第一第二行,消息分别是56981和56985,相应的82、82、84都被分给了其他线程,只是在当前线程的时间片内,可以处理这么多任务,所以就一次打印出来了

1120662-20170308175437641-681977166.png

7.结束语

这一章介绍了从安装到用JAVA语言编写生产者与消费者,在这里只是简单的消费消息并打印日志,如果一个消息需要处理的时间很长,而处理的过程中,这个消费者挂掉了,那消息会不会丢失呢?答案是肯定的,而且已经分配给这个消费者,但还没来得及处理的消息也会一并丢失掉,这个问题,RabbitMQ早就考虑到了,并且提供了解决方案,下一篇博文将进行详细介绍

发表评论

表情:
评论列表 (有 0 条评论,288人围观)

还没有评论,来说两句吧...

相关阅读