持久化实体
CassandraTemplate
类(及其响应式变体 ReactiveCassandraTemplate
),位于 org.springframework.data.cassandra
包中,是 Spring Cassandra 支持的核心类,并提供了丰富的功能集来与数据库交互。该模板提供了便捷的操作,用于创建、更新、删除和查询 Cassandra,并提供了将你的领域对象与 Cassandra 表中的行进行映射的功能。
一旦配置,模板实例是线程安全的,可以在多个实例之间重用。
Cassandra 中行与应用程序领域类之间的映射是通过委托给 CassandraConverter
接口的实现来完成的。Spring 提供了一个默认实现 MappingCassandraConverter
,但您也可以编写自己的自定义转换器。有关更详细的信息,请参见 Cassandra 转换 部分。
CassandraTemplate
类实现了 CassandraOperations
接口,而其响应式变体 ReactiveCassandraTemplate
实现了 ReactiveCassandraOperations
。尽可能地,[Reactive]CassandraOperations
上的方法都以 Cassandra 中可用的方法命名,以便使 API 对于已经熟悉 Cassandra 的开发者更为熟悉。
例如,您可以找到诸如 select
、insert
、delete
和 update
的方法。设计目标是尽可能简化在使用基础 Cassandra 驱动程序和 [Reactive]CassandraOperations
之间的过渡。这两个 API 之间的一个主要区别是 CassandraOperations
可以传递域对象,而不是 CQL 和查询对象。
引用 [Reactive]CassandraTemplate
实例上的操作的首选方式是通过 [Reactive]CassandraOperations
接口。
[Reactive]CassandraTemplate
使用的默认转换器实现是 MappingCassandraConverter
。虽然 MappingCassandraConverter
可以使用额外的元数据来指定对象到行的映射,但它也可以通过使用一些约定来转换不包含额外元数据的对象,这些约定用于字段和表名的映射。这些约定以及映射注解的使用在 “Mapping” 章节 中有详细说明。
另一个 [Reactive]CassandraTemplate
的核心功能是将 Cassandra Java 驱动抛出的异常转换为 Spring 的可移植数据访问异常层次结构。有关更多信息,请参阅 异常转换 部分。
模板 API 有不同的执行模型风格。基本的 CassandraTemplate
使用阻塞(命令式同步)执行模型。您可以使用 AsyncCassandraTemplate
进行异步执行,并与 ListenableFuture
实例同步,或者使用 ReactiveCassandraTemplate
进行响应式执行。
实例化 CassandraTemplate
CassandraTemplate
应始终作为 Spring bean 配置,尽管我们之前展示了一个示例,其中可以直接实例化它。然而,由于我们假设是在构建一个 Spring 模块的上下文中,我们假设存在 Spring 容器。
有两种方法可以获取 CassandraTemplate
,具体取决于你如何加载 Spring ApplicationContext
:
自动装配
您可以将 [Reactive]CassandraOperations
自动装配到您的项目中,如下例所示:
- Imperative
- Reactive
@Autowired
private CassandraOperations cassandraOperations;
@Autowired
private ReactiveCassandraOperations reactiveCassandraOperations;
与所有 Spring 自动装配一样,这假设在 ApplicationContext
中只有一个类型为 [Reactive]CassandraOperations
的 bean。如果您有多个 [Reactive]CassandraTemplate
bean(如果您在同一个项目中处理多个 keyspace,则会出现这种情况),那么您可以使用 @Qualifier
注解来指定您想要自动装配的 bean。
- Imperative
- Reactive
@Autowired
@Qualifier("keyspaceOneTemplateBeanId")
private CassandraOperations cassandraOperations;
@Autowired
@Qualifier("keyspaceOneTemplateBeanId")
private ReactiveCassandraOperations reactiveCassandraOperations;
使用 ApplicationContext
进行 Bean 查找
您还可以从 ApplicationContext
中查找 [Reactive]CassandraTemplate
bean,如下例所示:
- Imperative
- Reactive
CassandraOperations cassandraOperations = applicationContext.getBean("cassandraTemplate", CassandraOperations.class);
ReactiveCassandraOperations cassandraOperations = applicationContext.getBean("ReactiveCassandraOperations", ReactiveCassandraOperations.class);
查询行
你可以通过使用 Query
和 Criteria
类来表达你的查询,这些类的方法名称反映了原生 Cassandra 谓词运算符的名称,如 lt
、lte
、is
等。
Query
和 Criteria
类遵循流式 API 风格,以便您可以轻松地将多个方法标准和查询链接在一起,同时保持易于理解的代码。在 Java 中创建 Query
和 Criteria
实例时使用静态导入,以提高可读性。
查询表中的行
在前面的部分,我们看到如何通过在 [Reactive]CassandraTemplate
上使用 selectOneById
方法来检索单个对象。这样做会返回一个单一的领域对象。我们还可以查询一组行,以返回一个领域对象的列表。假设我们有多个 Person
对象,其名称和年龄值存储为表中的行,并且每个人都有一个账户余额,我们现在可以使用以下代码运行查询:
- Imperative
- Reactive
import static org.springframework.data.cassandra.core.query.Criteria.where;
import static org.springframework.data.cassandra.core.query.Query.query;
…
List<Person> result = cassandraTemplate.select(query(where("age").is(50))
.and(where("balance").gt(1000.00d)).withAllowFiltering(), Person.class);
import static org.springframework.data.cassandra.core.query.Criteria.where;
import static org.springframework.data.cassandra.core.query.Query.query;
…
Flux<Person> result = reactiveCassandraTemplate.select(query(where("age").is(50))
.and(where("balance").gt(1000.00d)).withAllowFiltering(), Person.class);
select
、selectOne
和 stream
方法接受一个 Query
对象作为参数。这个对象定义了用于执行查询的标准和选项。标准是通过使用一个 Criteria
对象来指定的,该对象有一个名为 where
的静态工厂方法,用于实例化一个新的 Criteria
对象。我们建议使用静态导入 org.springframework.data.cassandra.core.query.Criteria.where
和 Query.query
,以提高查询的可读性。
此查询应返回满足指定条件的 Person
对象列表。Criteria
类具有以下方法,这些方法对应于 Apache Cassandra 中提供的操作符:
Criteria 类的方法
-
CriteriaDefinition
gt(Object value)
:通过使用>
运算符创建一个标准。 -
CriteriaDefinition
gte(Object value)
:通过使用>=
运算符创建一个标准。 -
CriteriaDefinition
in(Object… values)
:通过使用IN
运算符为可变参数创建一个标准。 -
CriteriaDefinition
in(Collection<?> collection)
:通过使用IN
运算符使用集合创建一个标准。 -
CriteriaDefinition
is(Object value)
:通过使用字段匹配(column = value
)创建一个标准。 -
CriteriaDefinition
lt(Object value)
:通过使用<
运算符创建一个标准。 -
CriteriaDefinition
lte(Object value)
:通过使用⇐
运算符创建一个标准。 -
CriteriaDefinition
like(Object value)
:通过使用LIKE
运算符创建一个标准。 -
CriteriaDefinition
contains(Object value)
:通过使用CONTAINS
运算符创建一个标准。 -
CriteriaDefinition
containsKey(Object key)
:通过使用CONTAINS KEY
运算符创建一个标准。
Criteria
一旦创建后是不可变的。
Query 类的方法
Query
类有一些附加方法,你可以使用它们为查询提供选项:
-
Query
by(CriteriaDefinition… criteria)
:用于创建一个Query
对象。 -
Query
and(CriteriaDefinition criteria)
:用于向查询中添加额外的条件。 -
Query
columns(Columns columns)
:用于定义查询结果中要包含的列。 -
Query
limit(Limit limit)
:用于限制返回结果的大小,限制值由提供的limit
参数确定(用于SELECT
限制)。 -
Query
limit(long limit)
:用于限制返回结果的大小,限制值由提供的limit
参数确定(用于SELECT
限制)。 -
Query
pageRequest(Pageable pageRequest)
:用于将Sort
、PagingState
和fetchSize
与查询关联(用于分页)。 -
Query
pagingState(ByteBuffer pagingState)
:用于将ByteBuffer
与查询关联(用于分页)。 -
Query
queryOptions(QueryOptions queryOptions)
:用于将QueryOptions
与查询关联。 -
Query
sort(Sort sort)
:用于为查询结果提供排序定义。 -
Query
withAllowFiltering()
:用于渲染ALLOW FILTERING
查询。
Query
一旦创建是不可变的。调用方法会创建新的不可变(中间)Query
对象。
查询行的方法
Query
类具有以下返回行的方法:
-
List<T>
select(Query query, Class<T> entityClass)
:从表中查询类型为T
的对象列表。 -
T
selectOne(Query query, Class<T> entityClass)
:从表中查询一个类型为T
的对象。 -
Slice<T>
slice(Query query, Class<T> entityClass)
:通过查询类型为T
的对象的Slice
来开始或继续分页。 -
Stream<T>
stream(Query query, Class<T> entityClass)
:从表中查询类型为T
的对象流。 -
List<T>
select(String cql, Class<T> entityClass)
:通过提供 CQL 语句进行临时查询,获取类型为T
的对象列表。 -
T
selectOne(String cql, Class<T> entityClass)
:通过提供 CQL 语句进行临时查询,获取一个类型为T
的对象。 -
Stream<T>
stream(String cql, Class<T> entityClass)
:通过提供 CQL 语句进行临时查询,获取类型为T
的对象流。
查询方法必须指定返回的目标类型 T
。
Fluent Template API
[Reactive]CassandraOperations
接口是与 Apache Cassandra 进行更低级别交互时的核心组件之一。它提供了广泛的方法。您可以找到每个方法的多个重载。它们大多数涵盖了 API 的可选(可为 null)部分。
FluentCassandraOperations
及其响应式变体 ReactiveFluentCassandraOperations
为 [Reactive]CassandraOperations
的常用方法提供了更窄的接口,提供了更可读的流式 API。入口点(query(…)
、insert(…)
、update(…)
和 delete(…)
)遵循基于要执行的操作的自然命名方案。从入口点开始,API 旨在仅提供上下文相关的方法,引导开发人员朝向一个终止方法,该方法调用实际的 [Reactive]CassandraOperations
。以下示例展示了流式 API:
- 命令式
- 反应式
List<SWCharacter> all = ops.query(SWCharacter.class)
.inTable("star_wars") 1
.all();
如果
SWCharacter
使用@Table
定义了表名,或者使用类名作为表名没有问题,则跳过此步骤
Flux<SWCharacter> all = ops.query(SWCharacter.class)
.inTable("star_wars") 1
.all();
如果
SWCharacter
使用@Table
定义了表名,或者使用类名作为表名没有问题,则跳过此步骤
如果 Cassandra 中的一个表保存了不同类型的实体,例如在 SWCharacters
表中保存 Jedi
,你可以使用不同的类型来映射查询结果。你可以使用 as(Class<?> targetType)
将结果映射到不同的目标类型,而 query(Class<?> entityType)
仍然适用于查询和表名。以下示例使用了 query
和 as
方法:
- 命令式
- 响应式
List<Jedi> all = ops.query(SWCharacter.class) 1
.as(Jedi.class) 2
.matching(query(where("jedi").is(true)))
.all();
查询字段映射到
SWCharacter
类型。结果行映射到
Jedi
。
Flux<Jedi> all = ops.query(SWCharacter.class) 1
.as(Jedi.class) 2
.matching(query(where("jedi").is(true)))
.all();
查询字段映射到
SWCharacter
类型。结果行映射到
Jedi
。
你可以通过仅提供 interface
类型通过 as(Class<?>)
直接将 Projections 应用到结果文档中。
终止方法(first()
、one()
、all()
和 stream()
)处理在检索单个实体和检索多个实体(作为 List
或 Stream
)之间的切换,以及类似操作。
新的流畅模板 API 方法(即 query(..)
、insert(..)
、update(..)
和 delete(..)
)有效地使用线程安全的支持对象来构建 CQL 语句。然而,这也带来了额外的年轻代 JVM 堆开销,因为该设计基于各个 CQL 语句组件的最终字段,并且在变更时进行构建。你在可能插入或删除大量对象时(例如在循环内)应小心。
保存、更新和删除行
[Reactive]CassandraTemplate
提供了一种简单的方法,让你能够保存、更新和删除域对象,并将这些对象映射到由 Cassandra 管理的表中。
类型映射
插入和更新行的方法
[Reactive]CassandraTemplate
提供了多个方便的方法来保存和插入你的对象。为了对转换过程有更细粒度的控制,你可以将 Spring 的 Converter
实例注册到 MappingCassandraConverter
中(例如,Converter<Row, Person>
)。
插入和更新操作之间的区别在于 INSERT
操作不插入 null
值。
使用 INSERT
操作的简单情况是保存一个 POJO。在这种情况下,表名由简单的类名决定(而不是完全限定的类名)。可以通过使用映射元数据来覆盖存储对象的表。
在插入或更新时,必须设置 id
属性。Apache Cassandra 没有生成 ID 的方法。
以下示例使用保存操作并检索其内容:
- Imperative
- Reactive
import static org.springframework.data.cassandra.core.query.Criteria.where;
import static org.springframework.data.cassandra.core.query.Query.query;
…
Person bob = new Person("Bob", 33);
cassandraTemplate.insert(bob);
Person queriedBob = cassandraTemplate.selectOneById(query(where("age").is(33)), Person.class);
import static org.springframework.data.cassandra.core.query.Criteria.where;
import static org.springframework.data.cassandra.core.query.Query.query;
…
Person bob = new Person("Bob", 33);
cassandraTemplate.insert(bob);
Mono<Person> queriedBob = reactiveCassandraTemplate.selectOneById(query(where("age").is(33)), Person.class);
您可以使用以下操作来插入和保存:
-
void
insert(Object objectToSave)
: 将对象插入到 Apache Cassandra 表中。 -
WriteResult
insert(Object objectToSave, InsertOptions options)
: 将对象插入到 Apache Cassandra 表中并应用InsertOptions
。
您可以使用以下更新操作:
-
void
更新(Object objectToSave)
: 在 Apache Cassandra 表中更新对象。 -
WriteResult
更新(Object objectToSave, UpdateOptions options)
: 在 Apache Cassandra 表中更新对象并应用UpdateOptions
。
您也可以使用传统的方法,编写您自己的 CQL 语句,如以下示例所示:
- Imperative
- Reactive
String cql = "INSERT INTO person (age, name) VALUES (39, 'Bob')";
cassandraTemplate().getCqlOperations().execute(cql);
String cql = "INSERT INTO person (age, name) VALUES (39, 'Bob')";
Mono<Boolean> applied = reactiveCassandraTemplate.getReactiveCqlOperations().execute(cql);
您还可以在使用 InsertOptions
和 UpdateOptions
时配置其他选项,例如 TTL、一致性级别和轻量级事务。
我的行被插入到哪个表中?
您可以通过两种方式管理用于操作表的表名。默认的表名是将简单类名修改为以小写字母开头。因此,com.example.Person
类的实例将存储在 person
表中。第二种方式是通过 @Table
注解指定表名。
在批处理中插入、更新和删除单个对象
Cassandra 协议支持通过使用批处理一次性插入一组行。
[Reactive]CassandraTemplate
接口中的以下方法支持此功能:
batchOps
: 创建一个新的[Reactive]CassandraBatchOperations
来填充批处理。
[Reactive]CassandraBatchOperations
-
insert
: 接受一个单一对象、一个数组(可变参数)或一个对象的Iterable
来插入。 -
update
: 接受一个单一对象、一个数组(可变参数)或一个对象的Iterable
来更新。 -
delete
: 接受一个单一对象、一个数组(可变参数)或一个对象的Iterable
来删除。 -
withTimestamp
: 为批处理应用一个 TTL(生存时间)。 -
execute
: 执行批处理。
更新表中的行
对于更新,您可以选择更新多个行。。
以下示例展示了通过使用 +
赋值操作符向余额中添加一次性 $50.00 奖金,从而更新单个账户对象:
- Imperative
- Reactive
import static org.springframework.data.cassandra.core.query.Criteria.where;
import org.springframework.data.cassandra.core.query.Query;
import org.springframework.data.cassandra.core.query.Update;
…
boolean applied = cassandraTemplate.update(Query.query(where("id").is("foo")),
Update.create().increment("balance", 50.00), Account.class);
import static org.springframework.data.cassandra.core.query.Criteria.where;
import org.springframework.data.cassandra.core.query.Query;
import org.springframework.data.cassandra.core.query.Update;
…
Mono<Boolean> wasApplied = reactiveCassandraTemplate.update(Query.query(where("id").is("foo")),
Update.create().increment("balance", 50.00), Account.class);
除了之前讨论的 Query
,我们通过使用 Update
对象提供更新定义。Update
类具有与 Apache Cassandra 可用的更新赋值相匹配的方法。
大多数方法返回 Update
对象,以提供流畅的 API 以便于代码风格的使用。
执行行更新的方法
更新方法可以更新行,如下所示:
boolean
update(Query query, Update update, Class<?> entityClass)
:更新 Apache Cassandra 表中的一组对象。
Update 类的方法
Update
类可以使用一些“语法糖”,因为它的方法旨在链式调用。此外,您可以通过静态方法 public static Update update(String key, Object value)
和使用静态导入来启动一个新的 Update
实例的创建。
Update
类具有以下方法:
-
AddToBuilder
addTo(String columnName)
AddToBuilder
入口点:-
更新
prepend(Object value)
:使用+
更新赋值将一个集合值添加到现有集合的前面。 -
更新
prependAll(Object… values)
:使用+
更新赋值将所有集合值添加到现有集合的前面。 -
更新
append(Object value)
:使用+
更新赋值将一个集合值添加到现有集合的后面。 -
更新
append(Object… values)
:使用+
更新赋值将所有集合值添加到现有集合的后面。 -
更新
entry(Object key, Object value)
:使用+
更新赋值添加一个映射条目。 -
更新
addAll(Map<? extends Object, ? extends Object> map)
:使用+
更新赋值将所有映射条目添加到映射中。
-
-
Update
remove(String columnName, Object value)
:使用-
更新赋值从集合中移除一个值。 -
Update
clear(String columnName)
:清空集合。 -
Update
increment(String columnName, Number delta)
:使用+
更新赋值进行更新。 -
Update
decrement(String columnName, Number delta)
:使用-
更新赋值进行更新。 -
Update
set(String columnName, Object value)
:使用=
更新赋值进行更新。 -
SetBuilder
set(String columnName)
SetBuilder
入口点:-
更新
atIndex(int index).to(Object value)
:使用=
更新赋值将集合中的指定索引设置为一个值。 -
更新
atKey(String object).to(Object value)
:使用=
更新赋值将映射中的指定键设置为一个值。
-
以下列表显示了一些更新示例:
// UPDATE … SET key = 'Spring Data';
Update.update("key", "Spring Data")
// UPDATE … SET key[5] = 'Spring Data';
Update.empty().set("key").atIndex(5).to("Spring Data");
// UPDATE … SET key = key + ['Spring', 'DATA'];
Update.empty().addTo("key").appendAll("Spring", "Data");
请注意,一旦创建,Update
是不可变的。调用方法会创建新的不可变(中间)Update
对象。
删除行的方法
您可以使用以下重载方法从数据库中删除对象:
-
boolean
delete(Query query, Class<?> entityClass)
:删除由Query
选中的对象。 -
T
delete(T entity)
:删除给定的对象。 -
T
delete(T entity, QueryOptions queryOptions)
:删除给定的对象并应用QueryOptions
。 -
boolean
deleteById(Object id, Class<?> entityClass)
:使用给定的 Id 删除对象。
乐观锁
@Version
注解提供了类似于 JPA 的语法,适用于 Cassandra,并确保仅将更新应用于具有匹配版本的行。乐观锁利用 Cassandra 的轻量级事务来有条件地插入、更新和删除行。因此,INSERT
语句会带有 IF NOT EXISTS
条件。对于更新和删除操作,版本属性的实际值会被添加到 UPDATE
条件中,以确保如果在此期间其他操作修改了行,则该修改不会产生任何效果。如果发生这种情况,会抛出 OptimisticLockingFailureException
。以下示例展示了这些特性:
@Table
class Person {
@Id String id;
String firstname;
String lastname;
@Version Long version;
}
Person daenerys = template.insert(new Person("Daenerys")); 1
Person tmp = template.findOne(query(where("id").is(daenerys.getId())), Person.class); 2
daenerys.setLastname("Targaryen");
template.save(daenerys); 3
template.save(tmp); // throws OptimisticLockingFailureException // <4>
初始插入文档。
version
被设置为0
。加载刚刚插入的文档。
version
仍然是0
。使用
version = 0
更新文档。设置lastname
并将version
提升到1
。尝试更新之前加载的文档,该文档仍然具有
version = 0
。由于当前的version
是1
,操作失败并抛出OptimisticLockingFailureException
。
乐观锁仅支持单实体操作,而不支持批量操作。