跳到主要内容

会话与事务

DeepSeek V3 中英对照 Sessions & Transactions

从 3.6 版本开始,MongoDB 支持会话的概念。使用会话使得 MongoDB 的因果一致性模型成为可能,该模型保证操作按照其因果关系顺序执行。这些会话分为 ServerSession 实例和 ClientSession 实例。在本节中,当我们提到会话时,指的是 ClientSession

注意

客户端会话内的操作并不与会话外的操作隔离。

MongoOperationsReactiveMongoOperations 都提供了将 ClientSession 绑定到操作的网关方法。MongoCollectionMongoDatabase 使用实现了 MongoDB 集合和数据库接口的会话代理对象,因此你无需在每次调用时手动添加会话。这意味着对 MongoCollection#find() 的潜在调用会被委托给 MongoCollection#find(ClientSession)

备注

诸如 (Reactive)MongoOperations#getCollection 之类的方法返回原生的 MongoDB Java 驱动网关对象(例如 MongoCollection),这些对象本身提供了专门用于 ClientSession 的方法。这些方法不会被会话代理。当你直接与 MongoCollectionMongoDatabase 交互而不是通过 MongoOperations 上的某个 #execute 回调时,应在需要的地方提供 ClientSession

客户端会话支持

以下示例展示了会话的使用方式:

ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();

ClientSession session = client.startSession(sessionOptions); 1

template.withSession(() -> session)
.execute(action -> {

Query query = query(where("name").is("Durzo Blint"));
Person durzo = action.findOne(query, Person.class); 2

Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);

action.insert(azoth); 3

return azoth;
});

session.close() 4
java
  • 从服务器获取一个新的会话。

  • 像以前一样使用 MongoOperation 方法。ClientSession 会自动应用。

  • 确保关闭 ClientSession

  • 关闭会话。

注意

当处理 DBRef 实例时,特别是延迟加载的实例,不要在所有数据加载完成之前关闭 ClientSession。否则,延迟获取会失败。

MongoDB 事务

从版本 4 开始,MongoDB 支持事务。事务建立在会话之上,因此需要一个活动的 ClientSession

备注

除非你在应用程序上下文中指定一个 MongoTransactionManager,否则事务支持将被禁用。你可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非原生 MongoDB 事务。

为了获得对事务的完全编程控制,您可能希望在 MongoOperations 上使用会话回调。

以下示例展示了程序化的事务控制:

ClientSession session = client.startSession(options);                   1

template.withSession(session)
.execute(action -> {

session.startTransaction(); 2

try {

Step step = // ...;
action.insert(step);

process(step);

action.update(Step.class).apply(Update.set("state", // ...

session.commitTransaction(); 3

} catch (RuntimeException e) {
session.abortTransaction(); 4
}
}, ClientSession::close) 5
java
  • 获取一个新的 ClientSession

  • 启动事务。

  • 如果一切按预期进行,提交更改。

  • 如果出现问题,回滚所有操作。

  • 完成后不要忘记关闭会话。

上述示例让你在使用会话范围内的 MongoOperations 实例时完全控制事务行为,以确保会话传递给每个服务器调用。为了避免这种方法带来的一些开销,你可以使用 TransactionTemplate 来减少手动事务流的复杂性。

使用 TransactionTemplate / TransactionalOperator 进行事务处理

Spring Data MongoDB 事务支持 TransactionTemplateTransactionalOperator

template.setSessionSynchronization(ALWAYS);                                     1

// ...

TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager); 2

txTemplate.execute(new TransactionCallbackWithoutResult() {

@Override
protected void doInTransactionWithoutResult(TransactionStatus status) { 3

Step step = // ...;
template.insert(step);

process(step);

template.update(Step.class).apply(Update.set("state", // ...
}
});
java
  • 在 Template API 配置中启用事务同步。

  • 使用提供的 PlatformTransactionManager 创建 TransactionTemplate

  • 在回调中,ClientSession 和事务已经注册。

警告

在运行时更改 MongoTemplate 的状态(如您可能认为在前面的列表项 1 中可行)可能会导致线程和可见性问题。

使用 MongoTransactionManager 和 ReactiveMongoTransactionManager 进行事务处理

MongoTransactionManager / ReactiveMongoTransactionManager 是通往著名的 Spring 事务支持的入口。它允许应用程序使用 Spring 的托管事务功能MongoTransactionManagerClientSession 绑定到线程,而 ReactiveMongoTransactionManager 则使用 ReactorContext 来实现这一点。MongoTemplate 会检测到会话并相应地操作与事务关联的这些资源。MongoTemplate 也可以参与到其他正在进行的事务中。以下示例展示了如何使用 MongoTransactionManager 创建和使用事务:

@Configuration
static class Config extends AbstractMongoClientConfiguration {

@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) { 1
return new MongoTransactionManager(dbFactory);
}

// ...
}

@Component
public class StateService {

@Transactional
void someBusinessFunction(Step step) { 2

template.insert(step);

process(step);

template.update(Step.class).apply(Update.set("state", // ...
};
});
java
  • 在应用上下文中注册 MongoTransactionManager

  • 将方法标记为事务性。

备注

@Transactional(readOnly = true) 建议 MongoTransactionManager 也启动一个事务,将 ClientSession 添加到传出请求中。

控制 MongoDB 特定的事务选项

事务性服务方法可能需要特定的事务选项来运行事务。Spring Data MongoDB 的事务管理器支持评估事务标签,例如 @Transactional(label = { "mongo:readConcern=available" })

默认情况下,使用 mongo: 前缀的标签命名空间由默认配置的 MongoTransactionOptionsResolver 进行评估。事务标签由 TransactionAttribute 提供,并且可以通过 TransactionTemplateTransactionalOperator 进行编程式事务控制。由于其声明性特性,@Transactional(label = …) 提供了一个良好的起点,同时也可以作为文档使用。

目前,支持以下选项:

最大提交时间

控制服务器上 commitTransaction 操作的最大执行时间。该值的格式对应于 ISO-8601 持续时间格式,与 Duration.parse(…) 使用的格式相同。

用法:mongo:maxCommitTime=PT1S

读关注(Read Concern)

设置事务的读关注级别。

用法:mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE

读取偏好

设置事务的读取偏好。

用法:mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST

写入确认(Write Concern)

设置事务的写入关注级别。

用法:mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY

备注

参与外部事务的嵌套事务不会影响初始事务的选项,因为事务已经启动。事务选项仅在启动新事务时应用。

事务中的特殊行为

在事务中,MongoDB 服务器的行为略有不同。

连接设置

MongoDB 驱动程序提供了一个专门的副本集名称配置选项,将驱动程序切换为自动检测模式。该选项有助于在事务期间识别主副本集节点并处理命令路由。

备注

请确保在 MongoDB URI 中添加 replicaSet。更多详细信息,请参考连接字符串选项

集合操作

MongoDB 支持在事务中执行集合操作,例如集合创建。这也影响了首次使用时动态创建集合的行为。因此,请确保所有所需的结构都已准备就绪。

瞬态错误

MongoDB 可以在事务操作期间引发的错误上添加特殊标签。这些标签可能表示一些暂时性故障,仅仅通过重试操作就可能会消失。为此,我们强烈推荐使用 Spring Retry。不过,你也可以通过重写 MongoTransactionManager#doCommit(MongoTransactionObject) 方法来实现 重试提交操作 的行为,如 MongoDB 参考手册中所述。

计数

MongoDB 的 count 操作基于集合的统计信息,这些信息可能无法反映事务中的实际情况。当在多文档事务中执行 count 命令时,服务器会返回 错误 50851。一旦 MongoTemplate 检测到活动的事务,所有公开的 count() 方法都会被转换并委托给聚合框架,使用 $match$count 操作符,同时保留 Query 的设置,例如 collation

在聚合计数助手(aggregation count helper)中使用地理命令时存在一些限制。以下操作符不可使用,必须替换为其他操作符:

  • $where$expr

  • $near$geoWithin$center 结合使用

  • $nearSphere$geoWithin$centerSphere 结合使用

使用 Criteria.near(…)Criteria.nearSphere(…) 的查询必须分别重写为 Criteria.within(…)Criteria.withinSphere(…)。同样适用于仓库查询方法中的 near 查询关键字,必须更改为 within。有关更多参考,请参阅 MongoDB JIRA 工单 DRIVERS-518

以下代码片段展示了在会话绑定闭包中使用 count 的情况:

session.startTransaction();

template.withSession(session)
.execute(action -> {
action.count(query(where("state").is("active")), Step.class)
...
javascript

上面的代码片段会生成以下命令:

db.collection.aggregate(
[
{ $match: { state: "active" } },
{ $count: "totalEntityCount" }
]
)
javascript

代替:

db.collection.find( { state: "active" } ).count()
javascript