Skip to content

Commit 463fef6

Browse files
committed
Fix block() issues with reactive transactions.
Closes #1530.
1 parent 0b40ca1 commit 463fef6

File tree

6 files changed

+67
-39
lines changed

6 files changed

+67
-39
lines changed

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java

+40-27
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, fin
5959
}
6060

6161
public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, final CouchbaseConverter converter,
62-
final TranslationService translationService) {
62+
final TranslationService translationService) {
6363
this(clientFactory, converter, translationService, null);
6464
}
6565

@@ -75,35 +75,48 @@ public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, fin
7575
@Override
7676
public <T> Mono<T> save(T entity, String... scopeAndCollection) {
7777
Assert.notNull(entity, "Entity must not be null!");
78+
7879
String scope = scopeAndCollection.length > 0 ? scopeAndCollection[0] : null;
7980
String collection = scopeAndCollection.length > 1 ? scopeAndCollection[1] : null;
80-
Mono<T> result;
81-
final CouchbasePersistentEntity<?> mapperEntity = getConverter().getMappingContext()
82-
.getPersistentEntity(entity.getClass());
83-
final CouchbasePersistentProperty versionProperty = mapperEntity.getVersionProperty();
84-
final boolean versionPresent = versionProperty != null;
85-
final Long version = versionProperty == null || versionProperty.getField() == null ? null
86-
: (Long) ReflectionUtils.getField(versionProperty.getField(), entity);
87-
final boolean existingDocument = version != null && version > 0;
88-
89-
Class clazz = entity.getClass();
90-
91-
if (!versionPresent) { // the entity doesn't have a version property
92-
// No version field - no cas
93-
// If in a transaction, insert is the only thing that will work
94-
if (TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent()) {
95-
result = (Mono<T>) insertById(clazz).inScope(scope).inCollection(collection).one(entity);
96-
} else { // if not in a tx, then upsert will work
97-
result = (Mono<T>) upsertById(clazz).inScope(scope).inCollection(collection).one(entity);
81+
return Mono.deferContextual(xxx -> {
82+
Mono<T> result;
83+
final CouchbasePersistentEntity<?> mapperEntity = getConverter().getMappingContext()
84+
.getPersistentEntity(entity.getClass());
85+
final CouchbasePersistentProperty versionProperty = mapperEntity.getVersionProperty();
86+
final boolean versionPresent = versionProperty != null;
87+
final Long version = versionProperty == null || versionProperty.getField() == null ? null
88+
: (Long) ReflectionUtils.getField(versionProperty.getField(),
89+
entity);
90+
final boolean existingDocument = version != null && version > 0;
91+
92+
Class clazz = entity.getClass();
93+
94+
if (!versionPresent) { // the entity doesn't have a version property
95+
// No version field - no cas
96+
// If in a transaction, insert is the only thing that will work
97+
if (TransactionalSupport.checkForTransactionInThreadLocalStorage(xxx)
98+
.isPresent()) {
99+
result = (Mono<T>) insertById(clazz).inScope(scope)
100+
.inCollection(collection)
101+
.one(entity);
102+
} else { // if not in a tx, then upsert will work
103+
result = (Mono<T>) upsertById(clazz).inScope(scope)
104+
.inCollection(collection)
105+
.one(entity);
106+
}
107+
} else if (existingDocument) { // there is a version property, and it is non-zero
108+
// Updating existing document with cas
109+
result = (Mono<T>) replaceById(clazz).inScope(scope)
110+
.inCollection(collection)
111+
.one(entity);
112+
} else { // there is a version property, but it's zero or not set.
113+
// Creating new document
114+
result = (Mono<T>) insertById(clazz).inScope(scope)
115+
.inCollection(collection)
116+
.one(entity);
98117
}
99-
} else if (existingDocument) { // there is a version property, and it is non-zero
100-
// Updating existing document with cas
101-
result = (Mono<T>) replaceById(clazz).inScope(scope).inCollection(collection).one(entity);
102-
} else { // there is a version property, but it's zero or not set.
103-
// Creating new document
104-
result = (Mono<T>) insertById(clazz).inScope(scope).inCollection(collection).one(entity);
105-
}
106-
return result;
118+
return result;
119+
});
107120
}
108121

109122
public <T> Mono<Long> count(Query query, Class<T> domainType) {

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public static Optional<CouchbaseResourceHolder> checkForTransactionInThreadLocal
6363
public static Mono<Void> verifyNotInTransaction(String methodName) {
6464
return checkForTransactionInThreadLocalStorage().flatMap(s -> {
6565
if (s.isPresent()) {
66-
return Mono.error(new IllegalArgumentException(methodName + "can not be used inside a transaction"));
66+
return Mono.error(new IllegalArgumentException(methodName + " can not be used inside a transaction"));
6767
} else {
6868
return Mono.empty();
6969
}

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/mapping/CouchbaseList.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ private void verifyValueType(final Object value) {
202202
}
203203

204204
throw new IllegalArgumentException(
205-
"Attribute of type " + clazz.getCanonicalName() + "can not be stored and must be converted.");
205+
"Attribute of type " + clazz.getCanonicalName() + " can not be stored and must be converted.");
206206
}
207207

208208
/**

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import com.couchbase.client.java.transactions.config.TransactionOptions;
4545
import com.couchbase.client.java.transactions.error.TransactionCommitAmbiguousException;
4646
import com.couchbase.client.java.transactions.error.TransactionFailedException;
47-
import reactor.util.context.ContextView;
4847

4948
/**
5049
* The Couchbase transaction manager, providing support for @Transactional methods.
@@ -74,7 +73,9 @@ public CouchbaseCallbackTransactionManager(CouchbaseClientFactory couchbaseClien
7473

7574
@Override
7675
public <T> T execute(TransactionDefinition definition, TransactionCallback<T> callback) throws TransactionException {
77-
boolean createNewTransaction = handlePropagation(definition, null);
76+
boolean isInExistingTransaction = TransactionalSupport.checkForTransactionInThreadLocalStorage().block()
77+
.isPresent();
78+
boolean createNewTransaction = handlePropagation(definition, isInExistingTransaction);
7879

7980
setOptionsFromDefinition(definition);
8081

@@ -88,8 +89,9 @@ public <T> T execute(TransactionDefinition definition, TransactionCallback<T> ca
8889
@Stability.Internal
8990
<T> Flux<T> executeReactive(TransactionDefinition definition,
9091
org.springframework.transaction.reactive.TransactionCallback<T> callback) {
91-
return Flux.deferContextual((ctx) -> {
92-
boolean createNewTransaction = handlePropagation(definition, ctx);
92+
return TransactionalSupport.checkForTransactionInThreadLocalStorage().flatMapMany(isInTransaction -> {
93+
boolean isInExistingTransaction = isInTransaction.isPresent();
94+
boolean createNewTransaction = handlePropagation(definition, isInExistingTransaction);
9395

9496
setOptionsFromDefinition(definition);
9597

@@ -188,9 +190,7 @@ public boolean isCompleted() {
188190
}
189191

190192
// Propagation defines what happens when a @Transactional method is called from another @Transactional method.
191-
private boolean handlePropagation(TransactionDefinition definition, ContextView ctx) {
192-
boolean isExistingTransaction = ctx != null ? TransactionalSupport.checkForTransactionInThreadLocalStorage(ctx).isPresent() :
193-
TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent();
193+
private Boolean handlePropagation(TransactionDefinition definition, boolean isExistingTransaction) {
194194

195195
LOGGER.trace("Deciding propagation behaviour from {} and {}", definition.getPropagationBehavior(),
196196
isExistingTransaction);

spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionReactiveIntegrationTests.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.couchbase.client.java.query.QueryScanConsistency.REQUEST_PLUS;
2020

2121
import lombok.Data;
22+
import org.springframework.data.couchbase.domain.PersonWithoutVersion;
2223
import reactor.core.publisher.Mono;
2324
import reactor.test.StepVerifier;
2425

@@ -49,9 +50,9 @@
4950

5051
/**
5152
* todo gp: these tests are using the `.as(transactionalOperator::transactional)` method which is for the chopping
52-
* block, so presumably these tests are too
53-
* todo mr: I'm not sure how as(transactionalOperator::transactional) is different than
54-
* todo mr: transactionOperator.transaction(...)in CouchbaseTransactionalOperatorTemplateIntegrationTests ?
53+
* block, so presumably these tests are too todo mr: I'm not sure how as(transactionalOperator::transactional) is
54+
* different than todo mr: transactionOperator.transaction(...)in CouchbaseTransactionalOperatorTemplateIntegrationTests
55+
* ?
5556
*
5657
* @author Michael Reiche
5758
*/
@@ -72,6 +73,7 @@ public class CouchbasePersonTransactionReactiveIntegrationTests extends JavaInte
7273
String sName = "_default";
7374
String cName = "_default";
7475
Person WalterWhite;
76+
PersonWithoutVersion BobbyBlackWithoutVersion;
7577

7678
@BeforeAll
7779
public static void beforeAll() {
@@ -86,6 +88,7 @@ public static void afterAll() {
8688
@BeforeEach
8789
public void beforeEachTest() {
8890
WalterWhite = new Person("Walter", "White");
91+
BobbyBlackWithoutVersion = new PersonWithoutVersion("Bobby", "Black");
8992
TransactionTestUtil.assertNotInTransaction();
9093
List<RemoveResult> pr = operations.removeByQuery(Person.class).withConsistency(REQUEST_PLUS).all().collectList()
9194
.block();
@@ -140,6 +143,12 @@ public void commitShouldPersistTxEntriesOfTxAnnotatedMethod() {
140143

141144
}
142145

146+
@Test
147+
public void commitShouldPersistTxEntriesOfTxAnnotatedMethodNoVersion() {
148+
personService.declarativeSavePersonWithoutVersion(BobbyBlackWithoutVersion).as(StepVerifier::create) //
149+
.expectError(UnsupportedOperationException.class); //
150+
}
151+
143152
@Test
144153
public void commitShouldPersistTxEntriesAcrossCollections() {
145154

spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/PersonServiceReactive.java

+6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.junit.jupiter.api.Assertions.assertTrue;
2121

2222
import org.springframework.data.couchbase.core.TransactionalSupport;
23+
import org.springframework.data.couchbase.domain.PersonWithoutVersion;
2324
import org.springframework.stereotype.Service;
2425
import reactor.core.publisher.Flux;
2526
import reactor.core.publisher.Mono;
@@ -109,6 +110,11 @@ public Mono<Person> declarativeSavePerson(Person person) {
109110
return personOperationsRx.save(person);
110111
}
111112

113+
@Transactional
114+
public Mono<PersonWithoutVersion> declarativeSavePersonWithoutVersion(PersonWithoutVersion person) {
115+
return personOperationsRx.save(person);
116+
}
117+
112118
@Transactional
113119
public Mono<Person> declarativeSavePersonErrors(Person person) {
114120
return personOperationsRx.insertById(Person.class).one(person)

0 commit comments

Comments
 (0)