跳到主要内容

可跟踪游标

DeepSeek V3 中英对照 Tailable Cursors

默认情况下,当客户端消耗完游标提供的所有结果时,MongoDB 会自动关闭游标。在消耗完时关闭游标会将流转换为有限流。对于 封顶集合(capped collections),你可以使用 可追加游标(Tailable Cursor),该游标在客户端消耗完最初返回的数据后仍保持打开状态。

提示

可以使用 MongoOperations.createCollection 创建上限集合(Capped Collections)。为此,需要提供必要的 CollectionOptions.empty().capped()…​

可追踪游标可以使用命令式和响应式的 MongoDB API 进行消费。强烈建议使用响应式变体,因为它对资源的消耗较少。然而,如果无法使用响应式 API,你仍然可以使用 Spring 生态系统中已经普遍存在的消息处理概念。

使用 MessageListener 的可尾游标

使用 Sync Driver 监听一个封顶集合(capped collection)会创建一个长时间运行的阻塞任务,这个任务需要委托给一个单独的组件来处理。在这种情况下,我们需要首先创建一个 MessageListenerContainer,它将是运行特定 SubscriptionRequest 的主要入口点。Spring Data MongoDB 已经提供了一个默认的实现,该实现基于 MongoTemplate,并且能够为 TailableCursorRequest 创建和运行 Task 实例。

以下示例展示了如何将可尾游标与 MessageListener 实例结合使用:

示例 1. 使用 MessageListener 实例的可追踪游标

MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); 1

MessageListener<Document, User> listener = System.out::println; 2

TailableCursorRequest request = TailableCursorRequest.builder()
.collection("orders") 3
.filter(query(where("value").lt(100))) 4
.publishTo(listener) 5
.build();

container.register(request, User.class); 6

// ...

container.stop(); 7
java
  • 启动容器会初始化资源,并为已注册的 SubscriptionRequest 实例启动 Task 实例。启动后添加的请求会立即执行。

  • 定义在接收到 Message 时调用的监听器。Message#getBody() 会被转换为请求的领域类型。使用 Document 可以接收未经转换的原始结果。

  • 设置要监听的集合。

  • 提供一个可选的过滤器,用于接收文档。

  • 设置消息监听器,用于发布传入的 Message

  • 注册请求。返回的 Subscription 可用于检查当前 Task 的状态,并取消它以释放资源。

  • 一旦确定不再需要容器,请务必停止它。这样做会停止容器内所有正在运行的 Task 实例。

响应式可追踪游标

使用可尾随游标(tailable cursors)与响应式数据类型相结合,可以构建无限流。可尾随游标会一直保持打开状态,直到外部将其关闭。当新文档到达有上限的集合(capped collection)时,可尾随游标会发出数据。

如果查询没有返回匹配项,或者游标返回集合“末尾”的文档并且应用程序随后删除了该文档,可跟踪游标可能会变为无效或失效。以下示例展示了如何创建和使用无限流查询:

示例 2. 使用 ReactiveMongoOperations 进行无限流查询

Flux<Person> stream = template.tail(query(where("name").is("Joe")), Person.class);

Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();

// …

// Later: Dispose the subscription to close the stream
subscription.dispose();
java

Spring Data MongoDB 的响应式仓库通过使用 @Tailable 注解查询方法来支持无限流。这适用于返回 Flux 和其他能够发出多个元素的响应式类型的方法,如下例所示:

示例 3. 使用 ReactiveMongoRepository 进行无限流查询

public interface PersonRepository extends ReactiveMongoRepository<Person, String> {

@Tailable
Flux<Person> findByFirstname(String firstname);

}

Flux<Person> stream = repository.findByFirstname("Joe");

Disposable subscription = stream.doOnNext(System.out::println).subscribe();

// …

// Later: Dispose the subscription to close the stream
subscription.dispose();
java