变更流
自 MongoDB 3.6 起,Change Streams 允许应用程序在不尾随 oplog 的情况下获取变更通知。
更改流(Change Stream)仅支持副本集(replica set)或分片集群(sharded cluster)。
Change Streams 可以通过命令式和响应式 MongoDB Java 驱动来消费。强烈建议使用响应式变体,因为它对资源的消耗较少。然而,如果无法使用响应式 API,你仍然可以通过使用 Spring 生态系统中已经普遍存在的消息传递概念来获取变更事件。
可以在集合级别和数据库级别进行监听,其中数据库级别的监听会发布数据库中所有集合的变更。当订阅数据库变更流时,请确保为事件类型使用合适的类型,因为跨不同实体类型的转换可能无法正确应用。如有疑问,请使用 Document。
使用 MessageListener 的变更流
使用 同步驱动监听变更流 会创建一个长时间运行的阻塞任务,需要将其委托给一个单独的组件来处理。在这种情况下,我们首先需要创建一个 MessageListenerContainer,它将是运行特定 SubscriptionRequest 任务的主要入口点。Spring Data MongoDB 已经提供了一个默认实现,该实现基于 MongoTemplate,并且能够为 ChangeStreamRequest 创建和运行 Task 实例。
以下示例展示了如何将 Change Streams 与 MessageListener 实例一起使用:
示例 1. 使用 MessageListener 实例的变更流
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();                                                                                              1
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println;                           2
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); 3
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class);       4
// ...
container.stop();                                                                                               5
- 启动容器会初始化资源,并为已注册的 - SubscriptionRequest实例启动- Task实例。启动后添加的请求会立即执行。
- 定义在接收到 - Message时调用的监听器。- Message#getBody()会被转换为请求的领域类型。使用- Document可以接收未经转换的原始结果。
- 设置要监听的集合,并通过 - ChangeStreamOptions提供其他选项。
- 注册请求。返回的 - Subscription可用于检查当前- Task的状态,并取消它以释放资源。
- 一旦确定不再需要容器,请勿忘记停止它。这样做会停止容器内所有正在运行的 - Task实例。
处理过程中的错误会传递给 org.springframework.util.ErrorHandler。如果未另行说明,默认情况下会应用一个日志附加的 ErrorHandler。
请使用 register(request, body, errorHandler) 来提供额外的功能。
响应式变更流
使用响应式 API 订阅变更流是一种更为自然的方式来处理流。尽管如此,基本的构建模块,如 ChangeStreamOptions,仍然保持不变。以下示例展示了如何使用变更流来发出 ChangeStreamEvent:
示例 2. 发出 ChangeStreamEvent 的变更流
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) 1
    .watchCollection("people")
    .filter(where("age").gte(38))                                              2
    .listen();                                                                 3
- 底层文档应转换为的事件目标类型。若省略此参数,将接收未经转换的原始结果。 
- 使用聚合管道或仅使用查询 - Criteria来过滤事件。
- 获取变更流事件的 - Flux。- ChangeStreamEvent#getBody()从 (2) 转换为请求的域类型。
恢复变更流
变更流(Change Streams)可以恢复并从你离开的地方继续发出事件。要恢复流,你需要提供一个恢复令牌(resume token)或最后已知的服务器时间(UTC 时间)。使用 ChangeStreamOptions 来相应地设置这些值。
以下示例展示了如何使用服务器时间设置恢复偏移量:
示例 3. 恢复变更流
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
    .watchCollection("people")
    .resumeAt(Instant.now().minusSeconds(1)) 1
    .listen();
- 你可以通过 - getTimestamp方法获取- ChangeStreamEvent的服务器时间,或者使用通过- getResumeToken暴露的- resumeToken。
在某些情况下,当恢复变更流时,Instant 可能不是一个足够精确的度量。为此,请使用 MongoDB 原生的 BsonTimestamp。