Spring Data Redis(Redis Messaging/PubSub)
Redis Messaging/PubSub
Spring Data为 Redis提供了专用的消息传送集成,与Spring 框架集成JMS有相似的功能和命名方式;实际上,对Spring 支持的JMS 熟悉的用户,则很容易对Redis 的消息传送上手。
Redis messaging在功能上可以粗略的分为两个部分:生产/发布 和 消费/订阅,因此简称pubsub(Publish/Subscribe)。RedisTemplate这个类是用来生产消息的。对于异步接收,与JavaEE的消息驱动bean风格很相似,Spring Data提供了一个专用的消息监听容器,用来创建消息驱动POJOs(MDPs);对于同步接收,则由RedisConnecton contract。
对于使用Redis messaging 提供了核心的功能的类或接口,都在下面这两个包内:org.springframework.data.redis.connection 和 org.springframework.data.redis.listener 。
1. Sending/Publishing messages
发送/发布消息
要发布一个消息,你可以像其他操作一样,使用底层的RedisConnection 或高级别的RedisTemplate 。两个实体都提供了发布的方法,方法需要传递参数:要发送的消息和目的地通道。然而,RedisConnetion 需要传递的消息类型是原生的数据(字节数组),而RedisTemplate 则允许任意的 objects。
伪代码示例:
// send message through connection RedisConnection con = ...
byte[] msg = ...
byte[] channel = ...
con.publish(msg, channel); // send message through RedisTemplate
RedisTemplate template = ...
template.convertAndSend("hello!", "world");
2. Receiving/Subscribing for messages
接收/订阅消息
在接收端,你可以订阅一个或多个通道,通过直接命名或使用模式匹配。使用模式匹配的方式是相当有用的,它不仅仅可以用一个命令创建多个订阅,还可以监听在订阅的时候还没有创建的通道,只要它们匹配模式即可。
在底层,RedisConnection 提供了subscribe 和pSubscribe 方法,来对应Redis 的订阅命令,并由各自的模式来匹配各个通道。注意,大多数的通道或模式都被作为参数传递使用的。要改变一个连接的订阅,或者简单的查询是不是被订阅了,可以使用RedisConnection 的getSubscription 和isSubscribed 方法。
在Spring Data Redis 中订阅的命令是阻塞的。就是说,在一个连接上调用subscribe 方法,会阻塞当前线程来等待消息,否则只有在该订阅被取消的情况下该线程才会被释放,取消订阅必须由另外一个线程在同一个连接上调用unsubscribe 或者pUnsubscribe方法。要解决上面的问题需要查看下面的消息监听容器章节。
像上面提到的一样,一旦订阅了一个连接开始等待消息,在该连接上将不会调用其他命令,除了添加新的订阅 或 修改/取消一个存在的订阅。就是说,除了subscribe、pSubscribe、unsubscribe或者 pUnsubscribe,调用任何其他命令都是非法的,将抛出一个异常。
为了订阅消息,你需要实现MessageListener callback:每当一个新消息到了后,这个callbakc 将会被调用,在onMessage 方法中的用户的代码将会被执行。通过这个接口可以访问到:真实的消息、接收消息的通道、让订阅来匹配通道的模式(如果有的话)。这些信息可以供调用者来区分各式各样的消息,不仅仅可以通过内容还可以通过数据来进行。
Message Listener Containers
由于它具有阻塞的本性,低层次的订阅并不被人所关注,因为对于每个单独的监听都需要管理连接和线程。为了缓和这个问题,Spring Data提供了RedisMessageListenerContainer ,它代替用户做了所有的繁重的工作,熟悉EJB 和JMS 的用户可能会理解这个观念,它被设计的尽可能的被Spring 框架和它的消息驱动POJOs(MDPs)支持。
RedisMessageListenerContainer 作为一个消息监听容器;它被用来从Redis 通道接收消息,驱动所有注入到它里面的MessageListener。这个监听容器负责所有消息接收,然后发送到监听器进行处理。一个消息监听容器是一个MDP和一个消息提供者的中间媒介,它关心的是注册接收消息、资源的获取和释放、异常的转换等类似的事情。这允许应用开发者专心去写涉及到接收消息的业务逻辑(可能很复杂),让框架来关注Redis 底层的基础。
此外,为了最小化应用程序的内存占用,RedisMessageListenerContainer 允许一个连接和一个线程可以被多个监听器共享,即使它们不共享一个订阅都可以。因此无论一个应用有多少个监听器或通道,在它的生命周期内运行时的内存消耗都是一样的。此外,容器允许在运行的时候对配置信息进行修改,所以当应用运行中可以添加或删除监听器,而不用重启应用。加之容器还使用了懒订阅的方式,因此只有在需要的时候才使用RedisConnection ,如果所有的监听器都没有被订阅,会自动执行清除,释放掉所有的线程。
为了处理异步方式的消息,容器需要一个 java.util.concurrent.Executor ( 或者Spring 的TaskExecutor)来发送消息。依据机器的负载、监听器的数量或运行时的环境,你应该通过修改或设置执行器使它能更好的服务。尤其是在托管的环境下(像app服务),强烈的建议选择一个适当的TaskExecutor,使其在运行时能充分的发挥作用。
The MessageListenerAdapter
MessageListenerAdapter 这个类是Spring 异步消息发送支持的最后一个组件:简单的说,它允许你将几乎任何一个类转化为一个MDP(当然也有一些限制)。
考虑以下接口的定义。注意尽管这个接口不能继承MessageListener 接口,但它通过使用MessageListenerAdapter 这个类仍可以作为一个MDP。还要注意,依据可以接收和处理的各种消息类型的内容,各种消息的处理方法是如何强化类型的。此外,消息发送使用的通道或模式,以String类型作为第二个参数传入方法。
代码示例如下:
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message); void handleMessage(byte[] message);
void handleMessage(Serializable message);
// pass the channel/pattern as well
void handleMessage(Serializable message, String channel);
}
public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}
特别应该注意的是,上面的MessageDelegate 接口的实现类(如DefaultMessageDelegate )已经完全不依赖于Redis了。它只是一个POJO,我们将通过下面的配置将它变成一个MDP。
配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:redis="http://www.springframework.org/schema/redis" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd">
<!-- the default ConnectionFactory -->
<redis:listener-container>
<!-- the method attribute can be skipped as the default method name is "handleMessage" -->
<redis:listener ref="listener" method="handleMessage" topic="chatroom" />
</redis:listener-container>
<bean id="listener" class="redisexample.DefaultMessageDelegate"/>
...
<beans>
The listener topic can be either a channel (e.g. topic="chatroom") or a pattern (e.g. topic="*room")
监听器的主题可以是一个通道或一个模式。
上面示例使用Redis的命名空间声明了消息的监听器容器,并自动将POJOs注册为一个监听器。beans的定义展示如下:
<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="redisexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageListeners">
<map>
<entry key-ref="messageListener">
<bean class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="chatroom">
</bean>
</entry>
</map>
</property>
</bean>
每收到一个消息,这个适配器会自动的执行转化(使用配置的RedisSerializer),显示的将低级的格式转化为需要的对象类型,或反之。方法调用引起的任何异常,将会被容器捕获并处理。
还没有评论,来说两句吧...