rabbitMQ的简单使用

女爷i 2022-04-05 14:14 403阅读 0赞

文章目录

    • direct模式:直来直往,指哪到哪儿
      • 配置:
      • 生产者:
      • 消费者:
      • 特点:
  • 交换机模式:
    • 交换机模式1:topic模式,
      • 配置:
      • 生产者:
      • 消费者:
      • 过程:
    • 交换机模式2:Fanout模式:
      • 配置:
      • 生产者:
      • 消费者:
      • 特点:
    • 交换机模式3:Headers模式,
      • 配置:
      • 生产者:
      • 消费者:
      • 特点:
    • 总结:

生产者:制造消息
消费者:获取消息,处理消息
路由机制:中转消息
四种模式:

后三种是交换机模式

  1. Direct:不需要交换机,直接发送到指定队列,一对一
  2. Topic 需要key
  3. Fanout 广播模式,不需要key
  4. Headers:注意模式比较奇特,需要一个map,里面包含需要的header,只有匹配(或者不匹配)其中的header,才能放到对应的队列中

direct模式:直来直往,指哪到哪儿

代码demo:
direct模式:

配置:

  1. @Configuration
  2. public class MQConfig {
  3. public static final String QUEUE="queue";
  4. /** * 1, diect 模式 * @return */
  5. @Bean
  6. public Queue queue(){
  7. return new Queue("queue",true);
  8. }
  9. }

生产者:

  1. @Service
  2. @Slf4j
  3. public class MQSender {
  4. @Autowired
  5. AmqpTemplate amqpTemplate;
  6. public void send(Object message){
  7. String msg=RedisService.beanToString(message);
  8. log.info("send at:{},msg:{} ", MQConfig.QUEUE,msg);
  9. amqpTemplate.convertAndSend( MQConfig.QUEUE,msg);
  10. }
  11. }

消费者:

  1. @Service
  2. @Slf4j
  3. public class MQReceiver {
  4. @Autowired
  5. AmqpTemplate amqpTemplate;
  6. @RabbitListener(queues=MQConfig.QUEUE)
  7. public void receive(String message){
  8. log.warn("receive message from {},data:{}",MQConfig.QUEUE,message);
  9. }
  10. }

特点:

很直接,除了生产者和消费者,就没有更多的部分

交换机模式:

比direct多了个中间商(媒婆),交换机

交换机模式1:topic模式,

生产者发送消息时需要指定key和交换机,交换机知道key对应的queue,那么交换机就能将消息放到适合的队列中,交换机任务也就完成了。

配置:

  1. @Configuration
  2. public class MQConfig {
  3. public static final String TOPIC_QUEUE1="topic.queue1";
  4. public static final String TOPIC_QUEUE2="topic.queue2";
  5. /** * 2 ,topic 模式 交换机 * @return */
  6. @Bean
  7. public Queue topicQueue1(){
  8. return new Queue(TOPIC_QUEUE1,true);
  9. }
  10. @Bean
  11. public Queue topicQueue2(){
  12. return new Queue(TOPIC_QUEUE2,true);
  13. }
  14. @Bean
  15. public TopicExchange topicExchange(){
  16. return new TopicExchange(TOPIC_EXCHANGE);
  17. }
  18. /** *给队列绑定上key和交换机 */
  19. @Bean
  20. public Binding topicBinding1(){
  21. return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1");
  22. }
  23. @Bean
  24. public Binding topicBinding2(){
  25. /** * # 代表通配符 */
  26. return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
  27. }
  28. }

生产者:

发送消息时需要指定交换机和key

  1. @Service
  2. @Slf4j
  3. public class MQSender {
  4. @Autowired
  5. AmqpTemplate amqpTemplate;
  6. public void sendTopic(Object message){
  7. String msg=RedisService.beanToString(message);
  8. log.info("send topic message: {}",msg);
  9. amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key1",msg+" 1");
  10. amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key2",msg+" 2");
  11. }
  12. }

消费者:

只需要监听需要的队列即可

  1. @Service
  2. @Slf4j
  3. public class MQReceiver {
  4. @Autowired
  5. AmqpTemplate amqpTemplate;
  6. /** * topic 交换机模式下的receiver */
  7. @RabbitListener( queues = MQConfig.TOPIC_QUEUE1)
  8. public void receiveTopic1(String message){
  9. log.info("receiveTopic queue1 :{}",message);
  10. }
  11. @RabbitListener( queues = MQConfig.TOPIC_QUEUE2)
  12. public void receiveTopic2(String message){
  13. log.info("receiveTopic queue2 :{}",message);
  14. }
  15. }

过程:

生产者->message+key1->Exchange->queue(key1)->消费者

交换机模式2:Fanout模式:

广播模式,不指定key,通过交换机将消息发给与该交换机绑定的队列

配置:

  1. @Configuration
  2. public class MQConfig {
  3. public static final String QUEUE="queue";
  4. public static final String TOPIC_QUEUE1="topic.queue1";
  5. public static final String TOPIC_QUEUE2="topic.queue2";
  6. public static final String FANOUT_EXCHANGE="fanoutExchange";
  7. /** * 3, Fanout 模式 (广播模式) */
  8. @Bean
  9. public FanoutExchange fanoutExchange(){
  10. return new FanoutExchange(FANOUT_EXCHANGE);
  11. }
  12. /** * 广播模式绑定 */
  13. @Bean
  14. public Binding fanoutBinding1(){
  15. return BindingBuilder.bind(topicQueue1()).to(fanoutExchange());
  16. }
  17. @Bean
  18. public Binding fanoutBinding2(){
  19. return BindingBuilder.bind(topicQueue2()).to(fanoutExchange());
  20. }
  21. }

生产者:

  1. @Service
  2. @Slf4j
  3. public class MQSender {
  4. @Autowired
  5. AmqpTemplate amqpTemplate;
  6. public void sendFanout(Object message){
  7. String msg=RedisService.beanToString(message);
  8. log.info("send FANOUT message: {}",msg);
  9. amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE,"",msg+" FANOUT 1");
  10. }
  11. }

消费者:

同topic模式下的消费者

特点:

无需key,发送的是一条消息,但是接受这条小东西的队列一般是多个(广播)

交换机模式3:Headers模式,

匹配要求很多(通过一个map的key和value逐个与消息发送时设置的properties相比对,有wehereAll,whereAny,where 等等)

配置:

  1. @Configuration
  2. public class MQConfig {
  3. public static final String HEADER_QUEUE="header.queue";
  4. public static final String HEADER_EXCHANGE="headerExchange";
  5. /** * 4 header 模式 */
  6. @Bean
  7. public HeadersExchange headersExchange(){
  8. return new HeadersExchange(HEADER_EXCHANGE);
  9. }
  10. @Bean
  11. public Queue headerQueue(){
  12. return new Queue(HEADER_QUEUE,true);
  13. }
  14. @Bean
  15. public Binding headerBinding(){
  16. Map<String,Object> map=new HashMap<>();
  17. map.put("header1","value1");
  18. map.put("header2","value2");
  19. //满足符合这个map里面的数据才能放入队列
  20. return BindingBuilder.bind(headerQueue()).to(headersExchange()).whereAll(map).match();
  21. }
  22. }

生产者:

  1. @Service
  2. @Slf4j
  3. public class MQSender {
  4. @Autowired
  5. AmqpTemplate amqpTemplate;
  6. public void sendHeader(Object message){
  7. String msg=RedisService.beanToString(message);
  8. log.info("send HEADERS message: {}",msg);
  9. MessageProperties properties=new MessageProperties();
  10. properties.setHeader("header1","value1");
  11. properties.setHeader("header2","value2");
  12. Message obj=new Message(msg.getBytes(),properties);
  13. amqpTemplate.convertAndSend(MQConfig.HEADER_EXCHANGE,"",obj);
  14. }
  15. }

消费者:

比较特别,发送和接收都是byte[]

  1. @Service
  2. @Slf4j
  3. public class MQReceiver {
  4. @Autowired
  5. AmqpTemplate amqpTemplate;
  6. /** * headers 模式下的receiver */
  7. @RabbitListener(queues = MQConfig.HEADER_QUEUE)
  8. public void receiveHeader(byte [] message){
  9. String msg=new String(message);
  10. log.info("header receiver :{}",msg);
  11. }
  12. }

特点:

匹配很灵活,相当于是复合的key;消息是被转为byte[]传输和接收

总结:

direct模式:非交换机模式,直接发送各某个队列

topic模式:交换机+key,交换机通过消息的key去匹配哪个队列可以被放入此消息

fanout模式:交换机+广播 ,不使用key,交换机直接将信息放入与其绑定的队列

headers模式:交换机+map,可以看成是复合的key,交换机去将消息的properties与队列绑定时设置的header map值取匹配

发表评论

表情:
评论列表 (有 0 条评论,403人围观)

还没有评论,来说两句吧...

相关阅读