跳到主要内容

变更流

DeepSeek V3 中英对照 Change Streams

自 MongoDB 3.6 起,Change Streams 允许应用程序在不尾随 oplog 的情况下获取变更通知。

备注

更改流(Change Stream)仅支持副本集(replica set)或分片集群(sharded cluster)。

Change Streams 可以通过命令式和响应式 MongoDB Java 驱动来消费。强烈建议使用响应式变体,因为它对资源的消耗较少。然而,如果无法使用响应式 API,你仍然可以通过使用 Spring 生态系统中已经普遍存在的消息传递概念来获取变更事件。

可以在集合级别和数据库级别进行监听,其中数据库级别的监听会发布数据库中所有集合的变更。当订阅数据库变更流时,请确保为事件类型使用合适的类型,因为跨不同实体类型的转换可能无法正确应用。如有疑问,请使用 Document

使用 MessageListener 的变更流

使用 同步驱动监听变更流 会创建一个长时间运行的阻塞任务,需要将其委托给一个单独的组件来处理。在这种情况下,我们首先需要创建一个 MessageListenerContainer,它将是运行特定 SubscriptionRequest 任务的主要入口点。Spring Data MongoDB 已经提供了一个默认实现,该实现基于 MongoTemplate,并且能够为 ChangeStreamRequest 创建和运行 Task 实例。

以下示例展示了如何将 Change Streams 与 MessageListener 实例一起使用:

示例 1. 使用 MessageListener 实例的变更流

MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); 1

MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; 2
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); 3

Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); 4

// ...

container.stop(); 5
java
  • 启动容器会初始化资源,并为已注册的 SubscriptionRequest 实例启动 Task 实例。启动后添加的请求会立即执行。

  • 定义在接收到 Message 时调用的监听器。Message#getBody() 会被转换为请求的领域类型。使用 Document 可以接收未经转换的原始结果。

  • 设置要监听的集合,并通过 ChangeStreamOptions 提供其他选项。

  • 注册请求。返回的 Subscription 可用于检查当前 Task 的状态,并取消它以释放资源。

  • 一旦确定不再需要容器,请勿忘记停止它。这样做会停止容器内所有正在运行的 Task 实例。

备注

处理过程中的错误会传递给 org.springframework.util.ErrorHandler。如果未另行说明,默认情况下会应用一个日志附加的 ErrorHandler
请使用 register(request, body, errorHandler) 来提供额外的功能。

响应式变更流

使用响应式 API 订阅变更流是一种更为自然的方式来处理流。尽管如此,基本的构建模块,如 ChangeStreamOptions,仍然保持不变。以下示例展示了如何使用变更流来发出 ChangeStreamEvent

示例 2. 发出 ChangeStreamEvent 的变更流

Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) 1
.watchCollection("people")
.filter(where("age").gte(38)) 2
.listen(); 3
java
  • 底层文档应转换为的事件目标类型。若省略此参数,将接收未经转换的原始结果。

  • 使用聚合管道或仅使用查询 Criteria 来过滤事件。

  • 获取变更流事件的 FluxChangeStreamEvent#getBody() 从 (2) 转换为请求的域类型。

恢复变更流

变更流(Change Streams)可以恢复并从你离开的地方继续发出事件。要恢复流,你需要提供一个恢复令牌(resume token)或最后已知的服务器时间(UTC 时间)。使用 ChangeStreamOptions 来相应地设置这些值。

以下示例展示了如何使用服务器时间设置恢复偏移量:

示例 3. 恢复变更流

Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
.watchCollection("people")
.resumeAt(Instant.now().minusSeconds(1)) 1
.listen();
java
  • 你可以通过 getTimestamp 方法获取 ChangeStreamEvent 的服务器时间,或者使用通过 getResumeToken 暴露的 resumeToken

提示

在某些情况下,当恢复变更流时,Instant 可能不是一个足够精确的度量。为此,请使用 MongoDB 原生的 BsonTimestamp