Skip to content

Commit 3cc9af0

Browse files
committed
Add support for Couchbase Transactions.
The fluent operations are common for options that are common to operations with and without transactions. Once there is a transaction(ctx), or an option specific to without-transactions (such as scanConsistency), the interfaces are bifurcated, so that an non-transaction option cannot be applied to a transaction operation, and a transaction(ctx) cannot be applied where a non-transaction option has already been applied. Closes #1145.
1 parent 3ec3a30 commit 3cc9af0

File tree

75 files changed

+2263
-1251
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+2263
-1251
lines changed

pom.xml

+9
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
<couchbase.osgi>3.2.5</couchbase.osgi>
2323
<springdata.commons>2.7.0-SNAPSHOT</springdata.commons>
2424
<java-module-name>spring.data.couchbase</java-module-name>
25+
<couchbase-transactions>1.2.2</couchbase-transactions>
2526
</properties>
2627

2728
<dependencyManagement>
@@ -37,6 +38,14 @@
3738
</dependencyManagement>
3839

3940
<dependencies>
41+
42+
<dependency>
43+
<groupId>com.couchbase.client</groupId>
44+
<artifactId>couchbase-transactions</artifactId>
45+
<version>${couchbase-transactions}</version>
46+
</dependency>
47+
48+
4049
<dependency>
4150
<groupId>org.springframework</groupId>
4251
<artifactId>spring-context-support</artifactId>

src/main/java/org/springframework/data/couchbase/SimpleCouchbaseClientFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,9 @@ public Scope getScope() {
9797
@Override
9898
public Collection getCollection(final String collectionName) {
9999
final Scope scope = getScope();
100-
if (collectionName == null) {
100+
if (collectionName == null || CollectionIdentifier.DEFAULT_COLLECTION.equals(collectionName)) {
101101
if (!scope.name().equals(CollectionIdentifier.DEFAULT_SCOPE)) {
102-
throw new IllegalStateException("A collectionName must be provided if a non-default scope is used!");
102+
throw new IllegalStateException("A collectionName must be provided if a non-default scope is used");
103103
}
104104
return getBucket().defaultCollection();
105105
}

src/main/java/org/springframework/data/couchbase/config/AbstractCouchbaseConfiguration.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
package org.springframework.data.couchbase.config;
1818

1919
import static com.couchbase.client.java.ClusterOptions.clusterOptions;
20+
import static org.springframework.data.couchbase.config.BeanNames.COUCHBASE_MAPPING_CONTEXT;
21+
import static org.springframework.data.couchbase.config.BeanNames.COUCHBASE_TRANSACTIONS;
2022

23+
import java.time.Duration;
2124
import java.util.Collections;
2225
import java.util.HashSet;
2326
import java.util.Set;
2427

28+
import com.couchbase.client.java.query.QueryScanConsistency;
2529
import org.springframework.beans.factory.config.BeanDefinition;
2630
import org.springframework.context.annotation.Bean;
2731
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
@@ -46,6 +50,7 @@
4650
import org.springframework.util.ClassUtils;
4751
import org.springframework.util.StringUtils;
4852

53+
import com.couchbase.client.core.cnc.Event;
4954
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.DeserializationFeature;
5055
import com.couchbase.client.core.encryption.CryptoManager;
5156
import com.couchbase.client.core.env.Authenticator;
@@ -57,7 +62,10 @@
5762
import com.couchbase.client.java.env.ClusterEnvironment;
5863
import com.couchbase.client.java.json.JacksonTransformers;
5964
import com.couchbase.client.java.json.JsonValueModule;
60-
import com.couchbase.client.java.query.QueryScanConsistency;
65+
import com.couchbase.transactions.TransactionDurabilityLevel;
66+
import com.couchbase.transactions.Transactions;
67+
import com.couchbase.transactions.config.TransactionConfig;
68+
import com.couchbase.transactions.config.TransactionConfigBuilder;
6169
import com.fasterxml.jackson.databind.ObjectMapper;
6270

6371
/**
@@ -123,7 +131,7 @@ protected Authenticator authenticator() {
123131
* @param couchbaseCluster the cluster reference from the SDK.
124132
* @return the initialized factory.
125133
*/
126-
@Bean
134+
@Bean(name = BeanNames.COUCHBASE_CLIENT_FACTORY)
127135
public CouchbaseClientFactory couchbaseClientFactory(final Cluster couchbaseCluster) {
128136
return new SimpleCouchbaseClientFactory(couchbaseCluster, getBucketName(), getScopeName());
129137
}
@@ -282,7 +290,7 @@ public TranslationService couchbaseTranslationService() {
282290
* Creates a {@link CouchbaseMappingContext} equipped with entity classes scanned from the mapping base package.
283291
*
284292
*/
285-
@Bean
293+
@Bean(COUCHBASE_MAPPING_CONTEXT)
286294
public CouchbaseMappingContext couchbaseMappingContext(CustomConversions customConversions) throws Exception {
287295
CouchbaseMappingContext mappingContext = new CouchbaseMappingContext();
288296
mappingContext.setInitialEntitySet(getInitialEntitySet());
@@ -310,6 +318,16 @@ public ObjectMapper couchbaseObjectMapper() {
310318
return mapper;
311319
}
312320

321+
@Bean(COUCHBASE_TRANSACTIONS)
322+
public Transactions getTransactions(Cluster cluster) {
323+
return Transactions.create(cluster, getTransactionConfig());
324+
}
325+
326+
TransactionConfig getTransactionConfig() {
327+
return TransactionConfigBuilder.create().logDirectly(Event.Severity.INFO).logOnFailure(true, Event.Severity.ERROR)
328+
.expirationTime(Duration.ofMinutes(10)).durabilityLevel(TransactionDurabilityLevel.NONE).build();
329+
}
330+
313331
/**
314332
* Configure whether to automatically create indices for domain types by deriving the from the entity or not.
315333
*/

src/main/java/org/springframework/data/couchbase/config/BeanNames.java

+5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public class BeanNames {
3434

3535
public static final String COUCHBASE_CUSTOM_CONVERSIONS = "couchbaseCustomConversions";
3636

37+
public static final String COUCHBASE_TRANSACTIONS = "couchbaseTransactions";
38+
3739
/**
3840
* The name for the bean that stores custom mapping between repositories and their backing couchbaseOperations.
3941
*/
@@ -59,4 +61,7 @@ public class BeanNames {
5961
* The name for the bean that will handle reactive audit trail marking of entities.
6062
*/
6163
public static final String REACTIVE_COUCHBASE_AUDITING_HANDLER = "reactiveCouchbaseAuditingHandler";
64+
65+
public static final String COUCHBASE_CLIENT_FACTORY = "couchbaseClientFactory";
66+
6267
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package org.springframework.data.couchbase.core;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.context.ApplicationContext;
6+
import org.springframework.data.couchbase.core.convert.CouchbaseConverter;
7+
import org.springframework.data.couchbase.core.convert.join.N1qlJoinResolver;
8+
import org.springframework.data.couchbase.core.convert.translation.TranslationService;
9+
import org.springframework.data.couchbase.core.mapping.CouchbaseDocument;
10+
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentEntity;
11+
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentProperty;
12+
import org.springframework.data.couchbase.core.mapping.event.AfterSaveEvent;
13+
import org.springframework.data.couchbase.core.mapping.event.CouchbaseMappingEvent;
14+
import org.springframework.data.couchbase.repository.support.MappingCouchbaseEntityInformation;
15+
import org.springframework.data.couchbase.repository.support.TransactionResultHolder;
16+
import org.springframework.data.mapping.PersistentPropertyAccessor;
17+
import org.springframework.data.mapping.context.MappingContext;
18+
import org.springframework.data.mapping.model.ConvertingPropertyAccessor;
19+
20+
import com.couchbase.client.core.error.CouchbaseException;
21+
import org.springframework.util.ClassUtils;
22+
23+
import java.util.Map;
24+
import java.util.Set;
25+
26+
public abstract class AbstractTemplateSupport {
27+
28+
final ReactiveCouchbaseTemplate template;
29+
final CouchbaseConverter converter;
30+
final MappingContext<? extends CouchbasePersistentEntity<?>, CouchbasePersistentProperty> mappingContext;
31+
final TranslationService translationService;
32+
ApplicationContext applicationContext;
33+
static final Logger LOG = LoggerFactory.getLogger(AbstractTemplateSupport.class);
34+
35+
public AbstractTemplateSupport(ReactiveCouchbaseTemplate template, CouchbaseConverter converter, TranslationService translationService) {
36+
this.template = template;
37+
this.converter = converter;
38+
this.mappingContext = converter.getMappingContext();
39+
this.translationService = translationService;
40+
}
41+
42+
abstract ReactiveCouchbaseTemplate getReactiveTemplate();
43+
44+
public <T> T decodeEntityBase(String id, String source, long cas, Class<T> entityClass,
45+
String scope, String collection, TransactionResultHolder txResultHolder) {
46+
47+
final CouchbaseDocument converted = new CouchbaseDocument(id);
48+
converted.setId(id);
49+
50+
// this is the entity class defined for the repository. It may not be the class of the document that was read
51+
// we will reset it after reading the document
52+
//
53+
// This will fail for the case where:
54+
// 1) The version is defined in the concrete class, but not in the abstract class; and
55+
// 2) The constructor takes a "long version" argument resulting in an exception would be thrown if version in
56+
// the source is null.
57+
// We could expose from the MappingCouchbaseConverter determining the persistent entity from the source,
58+
// but that is a lot of work to do every time just for this very rare and avoidable case.
59+
// TypeInformation<? extends R> typeToUse = typeMapper.readType(source, type);
60+
61+
CouchbasePersistentEntity persistentEntity = couldBePersistentEntity(entityClass);
62+
63+
if (persistentEntity == null) { // method could return a Long, Boolean, String etc.
64+
// QueryExecutionConverters.unwrapWrapperTypes will recursively unwrap until there is nothing left
65+
// to unwrap. This results in List<String[]> being unwrapped past String[] to String, so this may also be a
66+
// Collection (or Array) of entityClass. We have no way of knowing - so just assume it is what we are told.
67+
// if this is a Collection or array, only the first element will be returned.
68+
Set<Map.Entry<String, Object>> set = ((CouchbaseDocument) translationService.decode(source, converted))
69+
.getContent().entrySet();
70+
return (T) set.iterator().next().getValue();
71+
}
72+
73+
// if possible, set the version property in the source so that if the constructor has a long version argument,
74+
// it will have a value an not fail (as null is not a valid argument for a long argument). This possible failure
75+
// can be avoid by defining the argument as Long instead of long.
76+
// persistentEntity is still the (possibly abstract) class specified in the repository definition
77+
// it's possible that the abstract class does not have a version property, and this won't be able to set the version
78+
if (cas != 0 && persistentEntity.getVersionProperty() != null) {
79+
converted.put(persistentEntity.getVersionProperty().getName(), cas);
80+
}
81+
82+
// if the constructor has an argument that is long version, then construction will fail if the 'version'
83+
// is not available as 'null' is not a legal value for a long. Changing the arg to "Long version" would solve this.
84+
// (Version doesn't come from 'source', it comes from the cas argument to decodeEntity)
85+
T readEntity = converter.read(entityClass, (CouchbaseDocument) translationService.decode(source, converted));
86+
final ConvertingPropertyAccessor<T> accessor = getPropertyAccessor(readEntity);
87+
88+
persistentEntity = couldBePersistentEntity(readEntity.getClass());
89+
90+
if (cas != 0 && persistentEntity.getVersionProperty() != null) {
91+
accessor.setProperty(persistentEntity.getVersionProperty(), cas);
92+
}
93+
N1qlJoinResolver.handleProperties(persistentEntity, accessor, template, id, scope, collection);
94+
return accessor.getBean();
95+
}
96+
97+
CouchbasePersistentEntity couldBePersistentEntity(Class<?> entityClass) {
98+
if (ClassUtils.isPrimitiveOrWrapper(entityClass) || entityClass == String.class) {
99+
return null;
100+
}
101+
return mappingContext.getPersistentEntity(entityClass);
102+
}
103+
104+
105+
106+
public <T> T applyResultBase(T entity, CouchbaseDocument converted, Object id, long cas,
107+
TransactionResultHolder txResultHolder) {
108+
ConvertingPropertyAccessor<Object> accessor = getPropertyAccessor(entity);
109+
110+
final CouchbasePersistentEntity<?> persistentEntity = converter.getMappingContext()
111+
.getRequiredPersistentEntity(entity.getClass());
112+
113+
final CouchbasePersistentProperty idProperty = persistentEntity.getIdProperty();
114+
if (idProperty != null) {
115+
accessor.setProperty(idProperty, id);
116+
}
117+
118+
final CouchbasePersistentProperty versionProperty = persistentEntity.getVersionProperty();
119+
if (versionProperty != null) {
120+
accessor.setProperty(versionProperty, cas);
121+
}
122+
123+
final CouchbasePersistentProperty transactionResultProperty = persistentEntity.transactionResultProperty();
124+
if (transactionResultProperty != null) {
125+
accessor.setProperty(transactionResultProperty, txResultHolder);
126+
}
127+
maybeEmitEvent(new AfterSaveEvent(accessor.getBean(), converted));
128+
return (T) accessor.getBean();
129+
130+
}
131+
132+
public Long getCas(final Object entity) {
133+
final ConvertingPropertyAccessor<Object> accessor = getPropertyAccessor(entity);
134+
final CouchbasePersistentEntity<?> persistentEntity = mappingContext.getRequiredPersistentEntity(entity.getClass());
135+
final CouchbasePersistentProperty versionProperty = persistentEntity.getVersionProperty();
136+
137+
long cas = 0;
138+
if (versionProperty != null) {
139+
Object casObject = accessor.getProperty(versionProperty);
140+
if (casObject instanceof Number) {
141+
cas = ((Number) casObject).longValue();
142+
}
143+
}
144+
return cas;
145+
}
146+
147+
public String getJavaNameForEntity(final Class<?> clazz) {
148+
final CouchbasePersistentEntity<?> persistentEntity = mappingContext.getRequiredPersistentEntity(clazz);
149+
MappingCouchbaseEntityInformation<?, Object> info = new MappingCouchbaseEntityInformation<>(persistentEntity);
150+
return info.getJavaType().getName();
151+
}
152+
153+
<T> ConvertingPropertyAccessor<T> getPropertyAccessor(final T source) {
154+
CouchbasePersistentEntity<?> entity = mappingContext.getRequiredPersistentEntity(source.getClass());
155+
PersistentPropertyAccessor<T> accessor = entity.getPropertyAccessor(source);
156+
return new ConvertingPropertyAccessor<>(accessor, converter.getConversionService());
157+
}
158+
159+
public <T> TransactionResultHolder getTxResultHolder(T source) {
160+
final CouchbasePersistentEntity<?> persistentEntity = mappingContext.getRequiredPersistentEntity(source.getClass());
161+
final CouchbasePersistentProperty transactionResultProperty = persistentEntity.transactionResultProperty();
162+
if (transactionResultProperty == null) {
163+
throw new CouchbaseException("the entity class " + source.getClass()
164+
+ " does not have a property required for transactions:\n\t@TransactionResult TransactionResultHolder txResultHolder");
165+
}
166+
return getPropertyAccessor(source).getProperty(transactionResultProperty, TransactionResultHolder.class);
167+
}
168+
169+
public void maybeEmitEvent(CouchbaseMappingEvent<?> event) {
170+
if (canPublishEvent()) {
171+
try {
172+
this.applicationContext.publishEvent(event);
173+
} catch (Exception e) {
174+
LOG.warn("{} thrown during {}", e, event);
175+
throw e;
176+
}
177+
} else {
178+
LOG.info("maybeEmitEvent called, but CouchbaseTemplate not initialized with applicationContext");
179+
}
180+
181+
}
182+
183+
private boolean canPublishEvent() {
184+
return this.applicationContext != null;
185+
}
186+
}

0 commit comments

Comments
 (0)