变更流
自 MongoDB 3.6 起,Change Streams 允许应用程序在不尾随 oplog 的情况下获取变更通知。
更改流(Change Stream)仅支持副本集(replica set)或分片集群(sharded cluster)。
Change Streams 可以通过命令式和响应式 MongoDB Java 驱动来消费。强烈建议使用响应式变体,因为它对资源的消耗较少。然而,如果无法使用响应式 API,你仍然可以通过使用 Spring 生态系统中已经普遍存在的消息传递概念来获取变更事件。
可以在集合级别和数据库级别进行监听,其中数据库级别的监听会发布数据库中所有集合的变更。当订阅数据库变更流时,请确保为事件类型使用合适的类型,因为跨不同实体类型的转换可能无法正确应用。如有疑问,请使用 Document
。
使用 MessageListener
的变更流
使用 同步驱动监听变更流 会创建一个长时间运行的阻塞任务,需要将其委托给一个单独的组件来处理。在这种情况下,我们首先需要创建一个 MessageListenerContainer,它将是运行特定 SubscriptionRequest
任务的主要入口点。Spring Data MongoDB 已经提供了一个默认实现,该实现基于 MongoTemplate
,并且能够为 ChangeStreamRequest 创建和运行 Task
实例。
以下示例展示了如何将 Change Streams 与 MessageListener
实例一起使用:
示例 1. 使用 MessageListener
实例的变更流
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); 1
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; 2
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); 3
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); 4
// ...
container.stop(); 5
启动容器会初始化资源,并为已注册的
SubscriptionRequest
实例启动Task
实例。启动后添加的请求会立即执行。定义在接收到
Message
时调用的监听器。Message#getBody()
会被转换为请求的领域类型。使用Document
可以接收未经转换的原始结果。设置要监听的集合,并通过
ChangeStreamOptions
提供其他选项。注册请求。返回的
Subscription
可用于检查当前Task
的状态,并取消它以释放资源。一旦确定不再需要容器,请勿忘记停止它。这样做会停止容器内所有正在运行的
Task
实例。
处理过程中的错误会传递给 org.springframework.util.ErrorHandler
。如果未另行说明,默认情况下会应用一个日志附加的 ErrorHandler
。
请使用 register(request, body, errorHandler)
来提供额外的功能。
响应式变更流
使用响应式 API 订阅变更流是一种更为自然的方式来处理流。尽管如此,基本的构建模块,如 ChangeStreamOptions
,仍然保持不变。以下示例展示了如何使用变更流来发出 ChangeStreamEvent
:
示例 2. 发出 ChangeStreamEvent
的变更流
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) 1
.watchCollection("people")
.filter(where("age").gte(38)) 2
.listen(); 3
底层文档应转换为的事件目标类型。若省略此参数,将接收未经转换的原始结果。
使用聚合管道或仅使用查询
Criteria
来过滤事件。获取变更流事件的
Flux
。ChangeStreamEvent#getBody()
从 (2) 转换为请求的域类型。
恢复变更流
变更流(Change Streams)可以恢复并从你离开的地方继续发出事件。要恢复流,你需要提供一个恢复令牌(resume token)或最后已知的服务器时间(UTC 时间)。使用 ChangeStreamOptions 来相应地设置这些值。
以下示例展示了如何使用服务器时间设置恢复偏移量:
示例 3. 恢复变更流
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
.watchCollection("people")
.resumeAt(Instant.now().minusSeconds(1)) 1
.listen();
你可以通过
getTimestamp
方法获取ChangeStreamEvent
的服务器时间,或者使用通过getResumeToken
暴露的resumeToken
。
在某些情况下,当恢复变更流时,Instant
可能不是一个足够精确的度量。为此,请使用 MongoDB 原生的 BsonTimestamp。