跳到主要内容

Redis 流

DeepSeek V3 中英对照 Redis Streams

Redis Streams 以一种抽象的方式模拟了日志数据结构。通常,日志是仅追加的数据结构,可以从开头、随机位置消费,或者通过流式传输新消息来消费。

备注

Redis 参考文档 中了解更多关于 Redis Streams 的内容。

Redis Streams 大致可以分为两个功能领域:

  • 追加记录

  • 消费记录

尽管这种模式与 发布/订阅 有相似之处,但主要区别在于消息的持久性以及消息的消费方式。

虽然 Pub/Sub 依赖于瞬时消息的广播(即如果你没有监听,就会错过一条消息),但 Redis Stream 使用一种持久化的、仅追加的数据类型,它会保留消息直到流被修剪。另一个消费上的区别是,Pub/Sub 注册了一个服务器端的订阅。Redis 将到达的消息推送给客户端,而 Redis Streams 则需要主动轮询。

org.springframework.data.redis.connectionorg.springframework.data.redis.stream 包提供了 Redis Streams 的核心功能。

追加

要发送一条记录,与其他操作一样,你可以使用低级别的 RedisConnection 或高级别的 StreamOperations。这两个实体都提供了 addxAdd)方法,该方法接受记录和目标流作为参数。虽然 RedisConnection 需要原始数据(字节数组),但 StreamOperations 允许将任意对象作为记录传入,如下例所示:

// append message through connection
RedisConnection con =
byte[] stream =
ByteRecord record = StreamRecords.rawBytes().withStreamKey(stream);
con.xAdd(record);

// append message through RedisTemplate
RedisTemplate template =
StringRecord record = StreamRecords.string().withStreamKey("my-stream");
template.opsForStream().add(record);
java

流记录携带一个 Map,即键值对元组,作为其有效载荷。将记录追加到流中会返回 RecordId,该 ID 可以用作进一步参考。

消费

在消费端,可以消费一个或多个流。Redis Streams 提供了读取命令,允许从已知流内容中的任意位置(随机访问)开始消费,并且可以超越流的末尾以消费新的流记录。

在底层,RedisConnection 提供了 xReadxReadGroup 方法,分别映射了 Redis 命令用于读取和在消费者组内读取。请注意,可以使用多个流作为参数。

备注

在 Redis 中,订阅命令可能是阻塞的。也就是说,在连接上调用 xRead 会导致当前线程阻塞,因为它开始等待消息。只有在读取命令超时或接收到消息时,线程才会被释放。

为了消费流消息,可以在应用程序代码中轮询消息,或者使用两种通过消息监听器容器进行异步接收中的一种,即命令式或响应式。每当新记录到达时,容器会通知应用程序代码。

同步接收

虽然流消费通常与异步处理相关联,但也可以同步消费消息。重载的 StreamOperations.read(…) 方法提供了这一功能。在同步接收过程中,调用线程可能会阻塞,直到有消息可用。属性 StreamReadOptions.block 指定了接收方在放弃等待消息之前应该等待多长时间。

// Read message through RedisTemplate
RedisTemplate template =

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(StreamReadOptions.empty().count(2),
StreamOffset.latest("my-stream"));

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(Consumer.from("my-group", "my-consumer"),
StreamReadOptions.empty().count(2),
StreamOffset.create("my-stream", ReadOffset.lastConsumed()))
java

通过消息监听容器进行异步接收

由于其阻塞特性,低级别的轮询并不吸引人,因为它需要为每个消费者进行连接和线程管理。为了缓解这个问题,Spring Data 提供了消息监听器,它们负责处理所有繁重的工作。如果你熟悉 EJB 和 JMS,你应该会发现这些概念很熟悉,因为它的设计尽可能地接近 Spring 框架及其消息驱动 POJO(MDPs)的支持。

Spring Data 提供了两种实现,分别针对不同的编程模型进行定制:

  • StreamMessageListenerContainer 作为命令式编程模型的消息监听容器。它用于从 Redis Stream 中消费记录,并驱动注入其中的 StreamListener 实例。

  • StreamReceiver 提供了消息监听器的响应式变体。它用于从 Redis Stream 中消费消息,并将其作为潜在的无限流,通过 Flux 发出流消息。

StreamMessageListenerContainerStreamReceiver 负责消息接收和分派到监听器进行处理的所有线程管理。消息监听容器/接收器是消息驱动 POJO (MDP) 和消息提供者之间的中介,负责注册接收消息、资源获取和释放、异常转换等任务。这让作为应用开发者的你能够专注于编写与接收消息(并对其做出反应)相关的(可能复杂的)业务逻辑,而将 Redis 基础设施的样板代码委托给框架处理。

两个容器都允许运行时配置更改,这样你可以在应用程序运行时添加或删除订阅,而无需重启。此外,容器采用了一种惰性订阅的方法,仅在需要时使用 RedisConnection。如果所有监听器都取消了订阅,它会自动执行清理,并释放线程。

命令式 StreamMessageListenerContainer

类似于 EJB 世界中的消息驱动 Bean(Message-Driven Bean, MDB),流驱动 POJO(Stream-Driven POJO, SDP)充当流消息的接收者。SDP 的一个限制是它必须实现 StreamListener 接口。此外,请注意,如果你的 POJO 在多个线程上接收消息,确保你的实现是线程安全的非常重要。

class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

@Override
public void onMessage(MapRecord<String, String, String> message) {

System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
}
}
java

StreamListener 表示一个函数式接口,因此其实现可以使用 Lambda 表达式形式重写:

message -> {

System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
};
java

一旦你实现了 StreamListener,接下来就是创建一个消息监听容器并注册订阅:

RedisConnectionFactory connectionFactory =
StreamListener<String, MapRecord<String, String, String>> streamListener =

StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
containerOptions);

Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), streamListener);
java

请参考各种消息监听器容器的 Javadoc,以获取每个实现所支持功能的完整描述。

响应式 StreamReceiver

流数据源的响应式消费通常通过事件的 Flux 或消息来实现。响应式接收器的实现由 StreamReceiver 及其重载的 receive(…) 方法提供。与 StreamMessageListenerContainer 相比,响应式方法所需的基础设施资源(如线程)更少,因为它利用了驱动程序提供的线程资源。接收流是 StreamMessage 的需求驱动发布者:

Flux<MapRecord<String, String, String>> messages =

return messages.doOnNext(it -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
});
java

现在我们需要创建 StreamReceiver 并注册一个订阅以消费流消息:

ReactiveRedisConnectionFactory connectionFactory =

StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder().pollTimeout(Duration.ofMillis(100))
.build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);

Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-stream"));
java

请参考各种消息监听器容器的 Javadoc,以获取每个实现所支持功能的完整描述。

备注

需求驱动消费通过背压信号来激活和停用轮询。如果需求得到满足,StreamReceiver 订阅会暂停轮询,直到订阅者发出进一步的需求信号。根据 ReadOffset 策略的不同,这可能会导致消息被跳过。

Acknowledge 策略

当你通过 Consumer Group 读取消息时,服务器会记住某条消息已被传递,并将其添加到待处理条目列表(Pending Entries List, PEL)中。这是一个已传递但尚未确认的消息列表。
消息必须通过 StreamOperations.acknowledge 进行确认,才能从待处理条目列表中移除,如下面的代码片段所示。

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ...

container.receive(Consumer.from("my-group", "my-consumer"), 1
StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
msg -> {

// ...
redisTemplate.opsForStream().acknowledge("my-group", msg); 2
});
java
  • 从组 my-group 中以 my-consumer 身份读取。收到的消息未被确认。

  • 在处理后确认了消息。

提示

要在接收消息时自动确认消息,请使用 receiveAutoAck 而不是 receive

ReadOffset 策略

流读取操作接受一个读取偏移量规范,用于从给定的偏移量开始消费消息。ReadOffset 表示读取偏移量规范。Redis 支持 3 种偏移量变体,具体取决于你是单独消费流还是在消费者组内消费流:

  • ReadOffset.latest() – 读取最新的消息。

  • ReadOffset.from(…) – 从特定的消息 ID 之后开始读取。

  • ReadOffset.lastConsumed() – 从最后消费的消息 ID 之后开始读取(仅限消费者组)。

在基于消息容器的消费场景中,我们消费消息时需要推进(或递增)读取偏移量。推进的方式取决于请求的 ReadOffset 和消费模式(是否使用消费者组)。以下矩阵解释了容器如何推进 ReadOffset

表 1. ReadOffset 前进

读取偏移量独立模式消费者组
最新读取最新消息读取最新消息
特定消息 ID使用最后看到的消息作为下一个消息 ID使用最后看到的消息作为下一个消息 ID

从特定的消息 ID 读取消息以及读取最后消费的消息可以被视为安全操作,确保了消费所有被追加到流中的消息。使用最新的消息进行读取可能会跳过在轮询操作处于“死区时间”时添加到流中的消息。轮询会引入一段死区时间,在此期间消息可能会在连续的轮询命令之间到达。流消费并不是线性的连续读取,而是被分割为重复的 XREAD 调用。

序列化

发送到流的任何记录都需要序列化为其二进制格式。由于流与哈希数据结构的紧密性,流键、字段名称和值使用在 RedisTemplate 上配置的相应序列化器。

表 2. 流序列化

流属性序列化器描述
keykeySerializer用于 Record#getStream()
fieldhashKeySerializer用于有效载荷中的每个映射键

请务必检查正在使用的 RedisSerializer,并注意如果您决定不使用任何序列化器,则需要确保这些值已经是二进制格式。

对象映射

简单值

StreamOperations 允许通过 ObjectRecord 将简单值直接附加到流中,而不必将这些值放入 Map 结构中。该值随后将被分配给一个 payload 字段,并在读取时可以提取出来。

ObjectRecord<String, String> record = StreamRecords.newRecord()
.in("my-stream")
.ofObject("my-value");

redisTemplate()
.opsForStream()
.add(record); 1

List<ObjectRecord<String, String>> records = redisTemplate()
.opsForStream()
.read(String.class, StreamOffset.fromStart("my-stream"));
java
  • XADD my-stream * "_class" "java.lang.String" "_raw" "my-value"

翻译为:

  • XADD my-stream * "_class" "java.lang.String" "_raw" "my-value"

ObjectRecord 与其他所有记录一样,都会经过相同的序列化过程,因此也可以通过返回 MapRecord 的无类型读取操作来获取该记录。

复数值

向流中添加复杂值可以通过以下 3 种方式实现:

  • 使用例如 JSON 字符串表示将值转换为简单值。

  • 使用合适的 RedisSerializer 序列化该值。

  • 使用 HashMapper 将值转换为适合序列化的 Map

第一种变体是最直接的,但它忽略了流结构提供的字段值功能,不过流中的值对其他消费者来说仍然是可读的。第二种选择与第一种具有相同的优点,但可能会导致非常特定的消费者限制,因为所有消费者都必须实现完全相同的序列化机制。HashMapper 方法稍微复杂一些,它利用了流的哈希结构,但扁平化了源数据。只要选择了合适的序列化器组合,其他消费者仍然能够读取记录。

备注

HashMappers 将有效负载转换为具有特定类型的 Map。请确保使用能够(反)序列化哈希的 Hash-Key 和 Hash-Value 序列化器。

ObjectRecord<String, User> record = StreamRecords.newRecord()
.in("user-logon")
.ofObject(new User("night", "angel"));

redisTemplate()
.opsForStream()
.add(record); 1

List<ObjectRecord<String, User>> records = redisTemplate()
.opsForStream()
.read(User.class, StreamOffset.fromStart("user-logon"));
java
  • XADD user-logon * "_class" "com.example.User" "firstname" "night" "lastname" "angel"

翻译为:

  • XADD user-logon * "_class" "com.example.User" "firstname" "night" "lastname" "angel"

StreamOperations 默认使用 ObjectHashMapper。在获取 StreamOperations 时,您可以提供一个适合您需求的 HashMapper

redisTemplate()
.opsForStream(new Jackson2HashMapper(true))
.add(record); 1
java
  • XADD user-logon * "firstname" "night" "@class" "com.example.User" "lastname" "angel"

备注

StreamMessageListenerContainer 可能无法感知域类型上使用的任何 @TypeAlias,因为这些需要通过 MappingContext 来解析。确保使用 initialEntitySet 初始化 RedisMappingContext

@Bean
RedisMappingContext redisMappingContext() {
RedisMappingContext ctx = new RedisMappingContext();
ctx.setInitialEntitySet(Collections.singleton(Person.class));
return ctx;
}

@Bean
RedisConverter redisConverter(RedisMappingContext mappingContext) {
return new MappingRedisConverter(mappingContext);
}

@Bean
ObjectHashMapper hashMapper(RedisConverter converter) {
return new ObjectHashMapper(converter);
}

@Bean
StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory connectionFactory, ObjectHashMapper hashMapper) {
StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainerOptions.builder()
.objectMapper(hashMapper)
.build();

return StreamMessageListenerContainer.create(connectionFactory, options);
}
java