Springboot+Redis实现消息队列(发布订阅模式)

川长思鸟来 2024-02-19 00:33 138阅读 0赞

我上找了很多例子,但是都觉得不太明白,自己琢磨了一下,做了个笔记!

因为spring对Redis进行了完整的封装,所以实现起来就比较简单。

pom文件

  1. <!-- 整合redis -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-data-redis</artifactId>
  5. </dependency>

Redis配置文件(配置方式比较多,具体根据情况而定):

  1. redis:
  2. # Redis数据库索引(默认为0)
  3. database: 0
  4. # Redis服务器地址
  5. host: 127.0.0.1
  6. # Redis服务器连接端口
  7. port: 6379
  8. #Redis服务器连接密码(默认为空)
  9. password:
  10. # 配置多节点
  11. #cluster:
  12. # nodes: 10.201.1.27:7001,10.201.1.27:7002,10.201.1.27:7003
  13. pool:
  14. max-active: 8 # 连接池最大连接数(使用负值表示没有限制)
  15. max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制)
  16. max-idle: 8 # 连接池中的最大空闲连接
  17. min-idle: 0 # 连接池中的最小空闲连接
  18. timeout: 100 # 连接超时时间(毫秒)

1, 消息发布代码

消息发布的实现是非常简单的,只需要使用 StringRedisTemplate的convertAndSend(channel, message)方法即可,

其中channel代表消息信道也可以理解为主题,message表示发布的内容。

消息发布类 ,模拟service层的逻辑处理

  1. package com.boot.test1.service;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.data.redis.core.StringRedisTemplate;
  4. import org.springframework.stereotype.Service;
  5. /**
  6. * 消息发布类
  7. * @author zhh
  8. */
  9. @Service
  10. public class Test01Service {
  11. @Autowired
  12. private StringRedisTemplate stringRedisTemplate;
  13. /**
  14. * 发布消息
  15. * @param channel 消息信道
  16. * @param message 消息内容
  17. */
  18. public void sendMessage(String channel, String message) {
  19. stringRedisTemplate.convertAndSend(channel, message);
  20. }
  21. /**
  22. * 发布消息的方法
  23. */
  24. public void setStr01(){
  25. this.sendMessage("mq_01", "发送信息内容01");
  26. this.sendMessage("mq_01", "发送信息内容011");
  27. this.sendMessage("mq_02", "发送信息内容02");
  28. }
  29. }

2,消息接收、处理代码

2.1 消息监听注册配置

主要把消息监听注册到容器里面

  1. package com.boot.common.conf;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.data.redis.connection.RedisConnectionFactory;
  5. import org.springframework.data.redis.listener.PatternTopic;
  6. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  7. import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
  8. import com.boot.test1.redismq.MessageReceiver;
  9. /**
  10. * Redis消息处理配置类
  11. * @author zhh
  12. * @date 2018-09-30
  13. *
  14. */
  15. @Configuration
  16. public class RedisMQConfig {
  17. /**
  18. * 注入消息监听容器
  19. * @param connectionFactory 连接工厂
  20. * @param listenerAdapter 监听处理器1
  21. * @param listenerAdapter 监听处理器2 (参数名称需和监听处理器的方法名称一致,因为@Bean注解默认注入的id就是方法名称)
  22. * @return
  23. */
  24. @Bean
  25. RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
  26. MessageListenerAdapter listenerAdapter,
  27. MessageListenerAdapter listenerAdapter2) {
  28. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  29. container.setConnectionFactory(connectionFactory);
  30. //订阅一个叫mq_01 的信道
  31. container.addMessageListener(listenerAdapter, new PatternTopic("mq_01"));
  32. //订阅一个叫mq_02 的信道
  33. container.addMessageListener(listenerAdapter2, new PatternTopic("mq_02"));
  34. //这个container 可以添加多个 messageListener
  35. return container;
  36. }
  37. /**
  38. * 消息监听处理器1
  39. * @param receiver 处理器类
  40. * @return
  41. */
  42. @Bean
  43. MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
  44. //给messageListenerAdapter 传入一个消息接收的处理器,利用反射的方法调用“receiveMessage”
  45. return new MessageListenerAdapter(receiver, "receiveMessage"); //receiveMessage:接收消息的方法名称
  46. }
  47. /**
  48. * 消息监听处理器2
  49. * @param receiver 处理器类
  50. * @return
  51. */
  52. @Bean
  53. MessageListenerAdapter listenerAdapter2(MessageReceiver receiver) {
  54. //给messageListenerAdapter 传入一个消息接收的处理器,利用反射的方法调用“receiveMessage2”
  55. return new MessageListenerAdapter(receiver, "receiveMessage2"); //receiveMessage:接收消息的方法名称
  56. }
  57. }

2.2 自定义的消息处理器

  1. package com.boot.test1.redismq;
  2. import org.springframework.stereotype.Component;
  3. /**
  4. * MQ消息处理器
  5. * @author zhh
  6. */
  7. @Component
  8. public class MessageReceiver {
  9. /**
  10. * 接收消息的方法1
  11. **/
  12. public void receiveMessage(String message){
  13. System.out.println("receiveMessage接收到的消息:"+message);
  14. }
  15. /**
  16. * 接收消息的方法2
  17. **/
  18. public void receiveMessage2(String message){
  19. System.out.println("receiveMessage2接收到的消息:"+message);
  20. }
  21. }

具体的结束在代码中有相应的注释,这个是最基本的实现方式,具体业务需要具体分析。

发表评论

表情:
评论列表 (有 0 条评论,138人围观)

还没有评论,来说两句吧...

相关阅读