Springboot2 整合 Spring Data Redis 实现消息队列——发布/订阅模式
一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式。
生产者消费者模式:
就像我们用微信和好友(群聊除外)聊天一样,微信就是这个队列,我们可以和很多个好友聊天,但是每条消息只能发给一个好友。
发布者订阅者模式:
发布/订阅模型如图所示,和订阅公众号是一样的。多个消费者可以消费消息。发布者需要建立一个topic 然后消费者去订阅。
接下来用springboot2 + spring data redis 来实现来简单实现订阅者模式:
spring data redis实现发布与订阅需要配置以下信息:
- - Topic
- - MessageListener
- - RedisMessageListenerContainer
1》依赖
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
</dependency>
Spring Boot支持与三种JSON mapping库集成:Gson、Jackson和JSON-B。Jackson是首选和默认的。
Jackson是spring-boot-starter-json的一部分,spring-boot-starter-web中包含spring-boot-starter-json。也就是说,当项目中引入spring-boot-starter-web后会自动引入spring-boot-starter-json。
但因为我这个项目不是web项目 所以还得引入jackson。如果是web项目就不用引入了。
2》配置 spring data redis:
package com.example.messagingredis;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Autowired
private LettuceConnectionFactory connectionFactory;
@Bean
public ConsumerRedisListener consumeRedis() {
return new ConsumerRedisListener();
}
@Bean
public ChannelTopic topic() {
return new ChannelTopic("topic");
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(consumeRedis(), topic());
return container;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(
Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
}
3》实现一个pojo 类型的 topic MessageListener :
package com.example.messagingredis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
public class ConsumerRedisListener implements MessageListener {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
doBusiness(message);
}
/**
* 打印 message body 内容
* @param message
*/
public void doBusiness(Message message) {
Object value = redisTemplate.getValueSerializer().deserialize(message.getBody());
System.out.println("consumer message: " + value.toString());
}
}
实体类:
package com.example.messagingredis;
import java.io.Serializable;
public class MessageEntity implements Serializable {
private static final long serialVersionUID = 8632296967087444509L;
private String id;
private String content;
public MessageEntity() {
super();
}
public MessageEntity(String id, String content) {
super();
this.id = id;
this.content = content;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "MessageEntity [id=" + id + ", content=" + content + "]";
}
}
4》其它:
记得配置上 redis 相关的配置,最简单的application.properties配置如下:
spring.redis.host=127.0.0.1
spring.redis.port=6379
5》添加测试类:
package com.example.messagingredis;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.Date;
@SpringBootApplication
public class MessagingRedisApplication {
public static void main(String[] args) {
ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args);
RedisConfig template = ctx.getBean(RedisConfig.class);
RedisConnectionFactory factory = null;
RedisTemplate<String,Object> redisTemplate = template.redisTemplate(factory);
String channel = "topic";
redisTemplate.convertAndSend(channel, "hello world");
redisTemplate.convertAndSend(channel, new Date(System.currentTimeMillis()));
redisTemplate.convertAndSend(channel, new MessageEntity("1", "object"));
}
}
执行可以看到控制台有订阅者消费消息打印出来
最后总结下:
用 spring data redis 来实现 redis 订阅者,本质上还是Listener模式,只需要配置Topic, MessageListener 和 RedisMessageListenerContainer就可以了。(就是redisconfig里面的配置)同时,发布时,只需要使用 redisTemplate 的 convertAndSend方法即可topic来发布message。
参考:https://my.oschina.net/simonton/blog/1833775
https://blog.csdn.net/johnf_nash/article/details/87891293
还没有评论,来说两句吧...