跳到主要内容

发布/订阅消息传递

DeepSeek V3 中英对照 Pub/Sub Messaging

Spring Data 为 Redis 提供了专用的消息传递集成,其功能和命名类似于 Spring Framework 中的 JMS 集成。

Redis 消息传递大致可以分为两个功能领域:

  • 消息的发布或生产

  • 消息的订阅或消费

这是一个通常被称为发布/订阅(简称 Pub/Sub)模式的示例。RedisTemplate 类用于消息的生产。对于类似于 Java EE 消息驱动 bean 风格的异步接收,Spring Data 提供了一个专用的消息监听容器,用于创建消息驱动的 POJO(MDPs);而对于同步接收,则使用 RedisConnection 合约。

org.springframework.data.redis.connectionorg.springframework.data.redis.listener 包提供了 Redis 消息传递的核心功能。

发布(发送消息) {#redis:pubsub:publish}

要发布消息,与其他操作一样,你可以使用低级别的 [Reactive]RedisConnection 或高级别的 [Reactive]RedisOperations。这两个实体都提供了 publish 方法,该方法接受消息和目标频道作为参数。虽然 RedisConnection 需要原始数据(字节数组),但 [Reactive]RedisOperations 允许将任意对象作为消息传递,如下例所示:

// send message through connection
RedisConnection con =
byte[] msg =
byte[] channel =
con.pubSubCommands().publish(msg, channel);

// send message through RedisOperations
RedisOperations operations =
Long numberOfClients = operations.convertAndSend("hello!", "world");
java

订阅(接收消息) {#redis:pubsub:subscribe}

在接收端,可以通过直接命名或使用模式匹配来订阅一个或多个频道。后一种方法非常有用,因为它不仅可以通过一条命令创建多个订阅,还可以监听在订阅时尚未创建的频道(只要它们匹配该模式)。

在底层,RedisConnection 提供了 subscribepSubscribe 方法,分别映射了 Redis 的按频道订阅和按模式订阅的命令。需要注意的是,可以传入多个频道或模式作为参数。若要更改连接的订阅状态或查询其是否正在监听,RedisConnection 提供了 getSubscriptionisSubscribed 方法。

备注

Spring Data Redis 中的订阅命令是阻塞的。也就是说,在连接上调用 subscribe 会导致当前线程阻塞,因为它开始等待消息。只有当订阅被取消时,线程才会被释放,这发生在另一个线程在同一个连接上调用 unsubscribepUnsubscribe 时。有关此问题的解决方案,请参见“消息监听容器”(本文档后面部分)。

如前所述,一旦订阅了,连接就会开始等待消息。只有添加新订阅、修改现有订阅和取消现有订阅的命令是被允许的。调用除了 subscribepSubscribeunsubscribepUnsubscribe 之外的任何命令都会抛出异常。

为了订阅消息,需要实现 MessageListener 回调。每当新消息到达时,回调会被触发,用户代码通过 onMessage 方法运行。该接口不仅提供了对实际消息的访问,还提供了接收消息的通道以及订阅用于匹配通道的模式(如果有)。这些信息使被调用者不仅可以通过内容区分不同的消息,还可以通过检查其他细节来区分。

消息监听容器 {#redis:pubsub:subscribe:containers}

由于其阻塞性质,低级别订阅并不具有吸引力,因为它需要为每个监听器管理连接和线程。为了缓解这个问题,Spring Data 提供了 RedisMessageListenerContainer,它完成了所有的繁重工作。如果你熟悉 EJB 和 JMS,你会发现这些概念非常相似,因为它被设计为尽可能接近 Spring Framework 及其消息驱动 POJO(MDPs)的支持。

RedisMessageListenerContainer 充当消息监听容器的角色。它用于从 Redis 通道接收消息,并驱动注入其中的 MessageListener 实例。该监听容器负责消息接收的所有线程管理,并将消息分派给监听器进行处理。消息监听容器是消息驱动 POJO(MDP)与消息提供者之间的中介,负责注册以接收消息、资源的获取和释放、异常转换等。这使得应用程序开发人员能够专注于编写与接收消息(并对其作出反应)相关的(可能复杂的)业务逻辑,而将 Redis 基础设施的样板代码委托给框架处理。

一个 MessageListener 还可以实现 SubscriptionListener 接口,以便在订阅/取消订阅确认时接收通知。在同步调用时,监听订阅通知可能会非常有用。

此外,为了最小化应用程序的资源占用,RedisMessageListenerContainer 允许多个监听器共享一个连接和一个线程,即使它们没有共享订阅。因此,无论应用程序跟踪了多少监听器或频道,其运行时的成本在整个生命周期中保持不变。此外,该容器允许运行时进行配置更改,因此您可以在应用程序运行时添加或移除监听器,而无需重新启动。另外,容器采用延迟订阅的方式,仅在需要时使用 RedisConnection。如果所有监听器都取消订阅,系统会自动执行清理并释放线程。

为了处理消息的异步特性,容器需要一个 java.util.concurrent.Executor(或 Spring 的 TaskExecutor)来分派消息。根据负载、监听器的数量或运行时环境的不同,你应该更改或调整这个执行器,以更好地满足你的需求。特别是在受管理的环境(如应用服务器)中,强烈建议选择一个合适的 TaskExecutor,以便充分利用其运行时特性。

MessageListenerAdapter 适配器 {#redis:pubsub:subscribe:adapter}

MessageListenerAdapter 类是 Spring 异步消息支持的最终组件。简而言之,它允许你将几乎 任何 类暴露为 MDP(尽管有一些限制)。

考虑以下接口定义:

public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
// pass the channel/pattern as well
void handleMessage(Serializable message, String channel);
}
java

请注意,尽管该接口没有扩展 MessageListener 接口,但它仍然可以通过使用 MessageListenerAdapter 类作为消息驱动型 POJO(MDP)使用。还请注意,各种消息处理方法是如何根据它们可以接收和处理的 Message 类型的内容进行强类型化的。此外,消息发送到的通道或模式可以作为第二个参数(类型为 String)传递给方法:

public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}
java
Notice how the above implementation of the `MessageDelegate` interface (the above `DefaultMessageDelegate` class) has *no* Redis dependencies at all. It truly is a POJO that we make into an MDP with the following configuration:
@Configuration
class MyConfig {

// …

@Bean
DefaultMessageDelegate listener() {
return new DefaultMessageDelegate();
}

@Bean
MessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {
return new MessageListenerAdapter(listener, "handleMessage");
}

@Bean
RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {

RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listener, ChannelTopic.of("chatroom"));
return container;
}
}
java
备注

监听器的主题可以是一个频道(例如:topic="chatroom"),也可以是一个模式(例如:topic="*room")。

前面的示例使用了 Redis 命名空间来声明消息监听容器,并自动将 POJOs 注册为监听器。完整的 bean 定义如下:

<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="redisexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>

<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageListeners">
<map>
<entry key-ref="messageListener">
<bean class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="chatroom"/>
</bean>
</entry>
</map>
</property>
</bean>
xml

每次接收到消息时,适配器会自动且透明地在底层格式与所需对象类型之间执行转换(使用配置的 RedisSerializer)。方法调用引起的任何异常都会被容器捕获并处理(默认情况下,异常会被记录到日志中)。

响应式消息监听容器 {#redis:reactive:pubsub:subscribe:containers}

Spring Data 提供了 ReactiveRedisMessageListenerContainer,它代表用户完成了转换和订阅状态管理的繁重工作。

消息监听容器本身不需要外部线程资源。它使用驱动线程来发布消息。

ReactiveRedisConnectionFactory factory =
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));
java

为了等待并确保正确订阅,你可以使用 receiveLater 方法,该方法返回一个 Mono<Flux<ChannelMessage>>。生成的 Mono 在完成对给定主题的订阅后,会以一个内部发布者完成。通过拦截 onNext 信号,你可以同步服务器端的订阅。

ReactiveRedisConnectionFactory factory =
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));

stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
.flatMapMany(Function.identity())
.;
java

通过模板 API 订阅 {#redis:reactive:pubsub:subscribe:template}

如上所述,你可以直接使用 ReactiveRedisTemplate 来订阅频道或模式。这种方法提供了一个简单直接的解决方案,尽管它有一定的限制,因为你无法在初始订阅之后添加新的订阅。不过,你仍然可以通过返回的 Flux 来控制消息流,例如使用 take(Duration)。当读取完成、发生错误或取消订阅时,所有绑定的资源都会被释放。

redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {
// message processing ...
}).subscribe();
java