发布/订阅消息传递
Spring Data 为 Redis 提供了专用的消息传递集成,其功能和命名类似于 Spring Framework 中的 JMS 集成。
Redis 消息传递大致可以分为两个功能领域:
-
消息的发布或生产
-
消息的订阅或消费
这是一个通常被称为发布/订阅(简称 Pub/Sub)模式的示例。RedisTemplate
类用于消息的生产。对于类似于 Java EE 消息驱动 bean 风格的异步接收,Spring Data 提供了一个专用的消息监听容器,用于创建消息驱动的 POJO(MDPs);而对于同步接收,则使用 RedisConnection
合约。
org.springframework.data.redis.connection
和 org.springframework.data.redis.listener
包提供了 Redis 消息传递的核心功能。
发布(发送消息) {#redis:pubsub:publish}
要发布消息,与其他操作一样,你可以使用低级别的 [Reactive]RedisConnection
或高级别的 [Reactive]RedisOperations
。这两个实体都提供了 publish
方法,该方法接受消息和目标频道作为参数。虽然 RedisConnection
需要原始数据(字节数组),但 [Reactive]RedisOperations
允许将任意对象作为消息传递,如下例所示:
- Imperative
- Reactive
// send message through connection
RedisConnection con = …
byte[] msg = …
byte[] channel = …
con.pubSubCommands().publish(msg, channel);
// send message through RedisOperations
RedisOperations operations = …
Long numberOfClients = operations.convertAndSend("hello!", "world");
// send message through connection
ReactiveRedisConnection con = …
ByteBuffer[] msg = …
ByteBuffer[] channel = …
con.pubSubCommands().publish(msg, channel);
// send message through ReactiveRedisOperations
ReactiveRedisOperations operations = …
Mono<Long> numberOfClients = operations.convertAndSend("hello!", "world");
订阅(接收消息) {#redis:pubsub:subscribe}
在接收端,可以通过直接命名或使用模式匹配来订阅一个或多个频道。后一种方法非常有用,因为它不仅可以通过一条命令创建多个订阅,还可以监听在订阅时尚未创建的频道(只要它们匹配该模式)。
在底层,RedisConnection
提供了 subscribe
和 pSubscribe
方法,分别映射了 Redis 的按频道订阅和按模式订阅的命令。需要注意的是,可以传入多个频道或模式作为参数。若要更改连接的订阅状态或查询其是否正在监听,RedisConnection
提供了 getSubscription
和 isSubscribed
方法。
Spring Data Redis 中的订阅命令是阻塞的。也就是说,在连接上调用 subscribe
会导致当前线程阻塞,因为它开始等待消息。只有当订阅被取消时,线程才会被释放,这发生在另一个线程在同一个连接上调用 unsubscribe
或 pUnsubscribe
时。有关此问题的解决方案,请参见“消息监听容器”(本文档后面部分)。
如前所述,一旦订阅了,连接就会开始等待消息。只有添加新订阅、修改现有订阅和取消现有订阅的命令是被允许的。调用除了 subscribe
、pSubscribe
、unsubscribe
或 pUnsubscribe
之外的任何命令都会抛出异常。
为了订阅消息,需要实现 MessageListener
回调。每当新消息到达时,回调会被触发,用户代码通过 onMessage
方法运行。该接口不仅提供了对实际消息的访问,还提供了接收消息的通道以及订阅用于匹配通道的模式(如果有)。这些信息使被调用者不仅可以通过内容区分不同的消息,还可以通过检查其他细节来区分。
消息监听容器 {#redis:pubsub:subscribe:containers}
由于其阻塞性质,低级别订阅并不具有吸引力,因为它需要为每个监听器管理连接和线程。为了缓解这个问题,Spring Data 提供了 RedisMessageListenerContainer,它完成了所有的繁重工作。如果你熟悉 EJB 和 JMS,你会发现这些概念非常相似,因为它被设计为尽可能接近 Spring Framework 及其消息驱动 POJO(MDPs)的支持。
RedisMessageListenerContainer 充当消息监听容器的角色。它用于从 Redis 通道接收消息,并驱动注入其中的 MessageListener 实例。该监听容器负责消息接收的所有线程管理,并将消息分派给监听器进行处理。消息监听容器是消息驱动 POJO(MDP)与消息提供者之间的中介,负责注册以接收消息、资源的获取和释放、异常转换等。这使得应用程序开发人员能够专注于编写与接收消息(并对其作出反应)相关的(可能复杂的)业务逻辑,而将 Redis 基础设施的样板代码委托给框架处理。
一个 MessageListener 还可以实现 SubscriptionListener 接口,以便在订阅/取消订阅确认时接收通知。在同步调用时,监听订阅通知可能会非常有用。
此外,为了最小化应用程序的资源占用,RedisMessageListenerContainer 允许多个监听器共享一个连接和一个线程,即使它们没有共享订阅。因此,无论应用程序跟踪了多少监听器或频道,其运行时的成本在整个生命周期中保持不变。此外,该容器允许运行时进行配置更改,因此您可以在应用程序运行时添加或移除监听器,而无需重新启动。另外,容器采用延迟订阅的方式,仅在需要时使用 RedisConnection
。如果所有监听器都取消订阅,系统会自动执行清理并释放线程。
为了处理消息的异步特性,容器需要一个 java.util.concurrent.Executor
(或 Spring 的 TaskExecutor
)来分派消息。根据负载、监听器的数量或运行时环境的不同,你应该更改或调整这个执行器,以更好地满足你的需求。特别是在受管理的环境(如应用服务器)中,强烈建议选择一个合适的 TaskExecutor
,以便充分利用其运行时特性。
MessageListenerAdapter 适配器 {#redis:pubsub:subscribe:adapter}
MessageListenerAdapter 类是 Spring 异步消息支持的最终组件。简而言之,它允许你将几乎 任何 类暴露为 MDP(尽管有一些限制)。
考虑以下接口定义:
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
// pass the channel/pattern as well
void handleMessage(Serializable message, String channel);
}
请注意,尽管该接口没有扩展 MessageListener
接口,但它仍然可以通过使用 MessageListenerAdapter 类作为消息驱动型 POJO(MDP)使用。还请注意,各种消息处理方法是如何根据它们可以接收和处理的 Message
类型的内容进行强类型化的。此外,消息发送到的通道或模式可以作为第二个参数(类型为 String
)传递给方法:
public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}
Notice how the above implementation of the `MessageDelegate` interface (the above `DefaultMessageDelegate` class) has *no* Redis dependencies at all. It truly is a POJO that we make into an MDP with the following configuration:
- Java
- XML
@Configuration
class MyConfig {
// …
@Bean
DefaultMessageDelegate listener() {
return new DefaultMessageDelegate();
}
@Bean
MessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {
return new MessageListenerAdapter(listener, "handleMessage");
}
@Bean
RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listener, ChannelTopic.of("chatroom"));
return container;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:redis="http://www.springframework.org/schema/redis"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/redis https://www.springframework.org/schema/redis/spring-redis.xsd">
<!-- the default ConnectionFactory -->
<redis:listener-container>
<!-- the method attribute can be skipped as the default method name is "handleMessage" -->
<redis:listener ref="listener" method="handleMessage" topic="chatroom" />
</redis:listener-container>
<bean id="listener" class="redisexample.DefaultMessageDelegate"/>
...
</beans>
监听器的主题可以是一个频道(例如:topic="chatroom"
),也可以是一个模式(例如:topic="*room"
)。
前面的示例使用了 Redis 命名空间来声明消息监听容器,并自动将 POJOs 注册为监听器。完整的 bean 定义如下:
<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="redisexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageListeners">
<map>
<entry key-ref="messageListener">
<bean class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="chatroom"/>
</bean>
</entry>
</map>
</property>
</bean>
每次接收到消息时,适配器会自动且透明地在底层格式与所需对象类型之间执行转换(使用配置的 RedisSerializer
)。方法调用引起的任何异常都会被容器捕获并处理(默认情况下,异常会被记录到日志中)。
响应式消息监听容器 {#redis:reactive:pubsub:subscribe:containers}
Spring Data 提供了 ReactiveRedisMessageListenerContainer,它代表用户完成了转换和订阅状态管理的繁重工作。
消息监听容器本身不需要外部线程资源。它使用驱动线程来发布消息。
ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);
Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));
为了等待并确保正确订阅,你可以使用 receiveLater
方法,该方法返回一个 Mono<Flux<ChannelMessage>>
。生成的 Mono
在完成对给定主题的订阅后,会以一个内部发布者完成。通过拦截 onNext
信号,你可以同步服务器端的订阅。
ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);
Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));
stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
.flatMapMany(Function.identity())
.…;
通过模板 API 订阅 {#redis:reactive:pubsub:subscribe:template}
如上所述,你可以直接使用 ReactiveRedisTemplate 来订阅频道或模式。这种方法提供了一个简单直接的解决方案,尽管它有一定的限制,因为你无法在初始订阅之后添加新的订阅。不过,你仍然可以通过返回的 Flux
来控制消息流,例如使用 take(Duration)
。当读取完成、发生错误或取消订阅时,所有绑定的资源都会被释放。
redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {
// message processing ...
}).subscribe();