会话与事务
从 3.6 版本开始,MongoDB 支持会话的概念。使用会话使得 MongoDB 的因果一致性模型成为可能,该模型保证操作按照其因果关系顺序执行。这些会话分为 ServerSession 实例和 ClientSession 实例。在本节中,当我们提到会话时,指的是 ClientSession。
客户端会话内的操作并不与会话外的操作隔离。
MongoOperations 和 ReactiveMongoOperations 都提供了将 ClientSession 绑定到操作的网关方法。MongoCollection 和 MongoDatabase 使用实现了 MongoDB 集合和数据库接口的会话代理对象,因此你无需在每次调用时手动添加会话。这意味着对 MongoCollection#find() 的潜在调用会被委托给 MongoCollection#find(ClientSession)。
诸如 (Reactive)MongoOperations#getCollection 之类的方法返回原生的 MongoDB Java 驱动网关对象(例如 MongoCollection),这些对象本身提供了专门用于 ClientSession 的方法。这些方法不会被会话代理。当你直接与 MongoCollection 或 MongoDatabase 交互而不是通过 MongoOperations 上的某个 #execute 回调时,应在需要的地方提供 ClientSession。
客户端会话支持
以下示例展示了会话的使用方式:
- 命令式
- 响应式
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
    .causallyConsistent(true)
    .build();
ClientSession session = client.startSession(sessionOptions); 1
template.withSession(() -> session)
    .execute(action -> {
        Query query = query(where("name").is("Durzo Blint"));
        Person durzo = action.findOne(query, Person.class);  2
        Person azoth = new Person("Kylar Stern");
        azoth.setMaster(durzo);
        action.insert(azoth);                                3
        return azoth;
    });
session.close()                                              4
- 从服务器获取一个新的会话。 
- 像以前一样使用 - MongoOperation方法。- ClientSession会自动应用。
- 确保关闭 - ClientSession。
- 关闭会话。 
当处理 DBRef 实例时,特别是延迟加载的实例,不要在所有数据加载完成之前关闭 ClientSession。否则,延迟获取会失败。
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
Publisher<ClientSession> session = client.startSession(sessionOptions); 1
template.withSession(session)
.execute(action -> {
        Query query = query(where("name").is("Durzo Blint"));
        return action.findOne(query, Person.class)
            .flatMap(durzo -> {
                Person azoth = new Person("Kylar Stern");
                azoth.setMaster(durzo);
                return action.insert(azoth);                            2
            });
    }, ClientSession::close)                                            3
    .subscribe();                                                       4
- 获取用于新会话检索的 - Publisher。
- 像以前一样使用 - ReactiveMongoOperation方法。- ClientSession会自动获取并应用。
- 确保关闭 - ClientSession。
- 在订阅之前,什么都不会发生。详情请参阅 Project Reactor 参考指南。 
通过使用提供实际会话的 Publisher,你可以将会话获取延迟到实际订阅时。不过,你仍然需要在完成时关闭会话,以免服务器上留下过期的会话。使用 execute 上的 doFinally 钩子在不再需要会话时调用 ClientSession#close()。如果你希望对会话本身有更多的控制,可以通过驱动程序获取 ClientSession,并通过 Supplier 提供它。
ClientSession 的响应式使用仅限于模板 API。目前还没有与响应式存储库的会话集成。
MongoDB 事务
除非你在应用程序上下文中指定一个 MongoTransactionManager,否则事务支持将被禁用。你可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非原生 MongoDB 事务。
为了获得对事务的完全编程控制,您可能希望在 MongoOperations 上使用会话回调。
以下示例展示了程序化的事务控制:
- 命令式
- 响应式
ClientSession session = client.startSession(options);                   1
template.withSession(session)
    .execute(action -> {
        session.startTransaction();                                     2
        try {
            Step step = // ...;
            action.insert(step);
            process(step);
            action.update(Step.class).apply(Update.set("state", // ...
            session.commitTransaction();                                3
        } catch (RuntimeException e) {
            session.abortTransaction();                                 4
        }
    }, ClientSession::close)                                            5
- 获取一个新的 - ClientSession。
- 启动事务。 
- 如果一切按预期进行,提交更改。 
- 如果出现问题,回滚所有操作。 
- 完成后不要忘记关闭会话。 
上述示例让你在使用会话范围内的 MongoOperations 实例时完全控制事务行为,以确保会话传递给每个服务器调用。为了避免这种方法带来的一些开销,你可以使用 TransactionTemplate 来减少手动事务流的复杂性。
Mono<DeleteResult> result = Mono
    .from(client.startSession())                                                             1
    .flatMap(session -> {
        session.startTransaction();                                                          2
        return Mono.from(collection.deleteMany(session, ...))                                3
            .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))   4
            .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))     5
            .doFinally(signal -> session.close());                                           6
      });
- 首先,我们显然需要启动会话。 
- 一旦我们有了 - ClientSession,就启动事务。
- 通过将 - ClientSession传递给操作,在事务内进行操作。
- 如果操作异常完成,我们需要停止事务并保留错误。 
- 或者在成功的情况下提交更改,同时保留操作结果。 
- 最后,我们需要确保关闭会话。 
上述操作的症结在于保持主流的 DeleteResult,而不是通过 commitTransaction() 或 abortTransaction() 发布的事务结果,这导致了一个相当复杂的设置。
除非你在应用程序上下文中指定 ReactiveMongoTransactionManager,否则事务支持是禁用的。你可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非原生 MongoDB 事务。
使用 TransactionTemplate / TransactionalOperator 进行事务处理
Spring Data MongoDB 事务支持 TransactionTemplate 和 TransactionalOperator。
- Imperative
- Reactive
template.setSessionSynchronization(ALWAYS);                                     1
// ...
TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);         2
txTemplate.execute(new TransactionCallbackWithoutResult() {
    @Override
    protected void doInTransactionWithoutResult(TransactionStatus status) {     3
        Step step = // ...;
        template.insert(step);
        process(step);
        template.update(Step.class).apply(Update.set("state", // ...
    }
});
- 在 Template API 配置中启用事务同步。 
- 使用提供的 - PlatformTransactionManager创建- TransactionTemplate。
- 在回调中, - ClientSession和事务已经注册。
在运行时更改 MongoTemplate 的状态(如您可能认为在前面的列表项 1 中可行)可能会导致线程和可见性问题。
template.setSessionSynchronization(ALWAYS);                                          1
// ...
TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
                                   new DefaultTransactionDefinition());              2
Step step = // ...;
template.insert(step);
Mono<Void> process(step)
    .then(template.update(Step.class).apply(Update.set("state", …))
    .as(rxtx::transactional)                                                         3
    .then();
- 为事务参与启用事务同步。 
- 使用提供的 - ReactiveTransactionManager创建- TransactionalOperator。
- TransactionalOperator.transactional(…)为所有上游操作提供事务管理。
使用 MongoTransactionManager 和 ReactiveMongoTransactionManager 进行事务处理
MongoTransactionManager / ReactiveMongoTransactionManager 是通往著名的 Spring 事务支持的入口。它允许应用程序使用 Spring 的托管事务功能。MongoTransactionManager 将 ClientSession 绑定到线程,而 ReactiveMongoTransactionManager 则使用 ReactorContext 来实现这一点。MongoTemplate 会检测到会话并相应地操作与事务关联的这些资源。MongoTemplate 也可以参与到其他正在进行的事务中。以下示例展示了如何使用 MongoTransactionManager 创建和使用事务:
- 命令式
- 响应式
@Configuration
static class Config extends AbstractMongoClientConfiguration {
    @Bean
    MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {  1
        return new MongoTransactionManager(dbFactory);
    }
    // ...
}
@Component
public class StateService {
    @Transactional
    void someBusinessFunction(Step step) {                                        2
        template.insert(step);
        process(step);
        template.update(Step.class).apply(Update.set("state", // ...
    };
});
- 在应用上下文中注册 - MongoTransactionManager。
- 将方法标记为事务性。 
@Transactional(readOnly = true) 建议 MongoTransactionManager 也启动一个事务,将 ClientSession 添加到传出请求中。
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {
    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {  1
        return new ReactiveMongoTransactionManager(factory);
    }
    // ...
}
@Service
public class StateService {
    @Transactional
    Mono<UpdateResult> someBusinessFunction(Step step) {                                  2
        return template.insert(step)
            .then(process(step))
            .then(template.update(Step.class).apply(Update.set("state", …));
    };
});
- 在应用上下文中注册 - ReactiveMongoTransactionManager。
- 将方法标记为事务性。 
@Transactional(readOnly = true) 建议 ReactiveMongoTransactionManager 也启动一个事务,将 ClientSession 添加到传出请求中。
控制 MongoDB 特定的事务选项
事务性服务方法可能需要特定的事务选项来运行事务。Spring Data MongoDB 的事务管理器支持评估事务标签,例如 @Transactional(label = { "mongo:readConcern=available" })。
默认情况下,使用 mongo: 前缀的标签命名空间由默认配置的 MongoTransactionOptionsResolver 进行评估。事务标签由 TransactionAttribute 提供,并且可以通过 TransactionTemplate 和 TransactionalOperator 进行编程式事务控制。由于其声明性特性,@Transactional(label = …) 提供了一个良好的起点,同时也可以作为文档使用。
目前,支持以下选项:
最大提交时间
控制服务器上 commitTransaction 操作的最大执行时间。该值的格式对应于 ISO-8601 持续时间格式,与 Duration.parse(…) 使用的格式相同。
用法:mongo:maxCommitTime=PT1S
读关注(Read Concern)
设置事务的读关注级别。
用法:mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE
读取偏好
设置事务的读取偏好。
用法:mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST
写入确认(Write Concern)
设置事务的写入关注级别。
用法:mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY
参与外部事务的嵌套事务不会影响初始事务的选项,因为事务已经启动。事务选项仅在启动新事务时应用。
事务中的特殊行为
在事务中,MongoDB 服务器的行为略有不同。
连接设置
MongoDB 驱动程序提供了一个专门的副本集名称配置选项,将驱动程序切换为自动检测模式。该选项有助于在事务期间识别主副本集节点并处理命令路由。
请确保在 MongoDB URI 中添加 replicaSet。更多详细信息,请参考连接字符串选项。
集合操作
MongoDB 不支持在事务中执行集合操作,例如集合创建。这也影响了首次使用时动态创建集合的行为。因此,请确保所有所需的结构都已准备就绪。
瞬态错误
MongoDB 可以在事务操作期间引发的错误上添加特殊标签。这些标签可能表示一些暂时性故障,仅仅通过重试操作就可能会消失。为此,我们强烈推荐使用 Spring Retry。不过,你也可以通过重写 MongoTransactionManager#doCommit(MongoTransactionObject) 方法来实现 重试提交操作 的行为,如 MongoDB 参考手册中所述。
计数
MongoDB 的 count 操作基于集合的统计信息,这些信息可能无法反映事务中的实际情况。当在多文档事务中执行 count 命令时,服务器会返回 错误 50851。一旦 MongoTemplate 检测到活动的事务,所有公开的 count() 方法都会被转换并委托给聚合框架,使用 $match 和 $count 操作符,同时保留 Query 的设置,例如 collation。
在聚合计数助手(aggregation count helper)中使用地理命令时存在一些限制。以下操作符不可使用,必须替换为其他操作符:
- 
$where→$expr
- 
$near→$geoWithin与$center结合使用
- 
$nearSphere→$geoWithin与$centerSphere结合使用
使用 Criteria.near(…) 和 Criteria.nearSphere(…) 的查询必须分别重写为 Criteria.within(…) 和 Criteria.withinSphere(…)。同样适用于仓库查询方法中的 near 查询关键字,必须更改为 within。有关更多参考,请参阅 MongoDB JIRA 工单 DRIVERS-518。
以下代码片段展示了在会话绑定闭包中使用 count 的情况:
session.startTransaction();
template.withSession(session)
    .execute(action -> {
        action.count(query(where("state").is("active")), Step.class)
        ...
上面的代码片段会生成以下命令:
db.collection.aggregate(
   [
      { $match: { state: "active" } },
      { $count: "totalEntityCount" }
   ]
)
代替:
db.collection.find( { state: "active" } ).count()