Skip to content

Commit 3107c40

Browse files
committed
Fix call to block() in CouchbaseCallbackTransactionManager.
Closes #1527.
1 parent a92ebd0 commit 3107c40

File tree

2 files changed

+17
-5
lines changed

2 files changed

+17
-5
lines changed

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java

+10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.couchbase.core;
1717

18+
import com.couchbase.client.core.transaction.threadlocal.TransactionMarker;
1819
import reactor.core.publisher.Mono;
1920

2021
import java.util.Optional;
@@ -26,6 +27,7 @@
2627
import com.couchbase.client.core.error.transaction.TransactionOperationFailedException;
2728
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
2829
import com.couchbase.client.core.transaction.threadlocal.TransactionMarkerOwner;
30+
import reactor.util.context.ContextView;
2931

3032
/**
3133
* Utility methods to support transactions.
@@ -50,6 +52,14 @@ public static Mono<Optional<CouchbaseResourceHolder>> checkForTransactionInThrea
5052
});
5153
}
5254

55+
public static Optional<CouchbaseResourceHolder> checkForTransactionInThreadLocalStorage(ContextView ctx) {
56+
return Optional.ofNullable(ctx.hasKey(TransactionMarker.class) ? new CouchbaseResourceHolder(ctx.get(TransactionMarker.class).context()) : null);
57+
}
58+
59+
//public static Optional<CouchbaseResourceHolder> blockingCheckForTransactionInThreadLocalStorage() {
60+
// return TransactionMarkerOwner.marker;
61+
// }
62+
5363
public static Mono<Void> verifyNotInTransaction(String methodName) {
5464
return checkForTransactionInThreadLocalStorage().flatMap(s -> {
5565
if (s.isPresent()) {

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.couchbase.client.java.transactions.config.TransactionOptions;
4545
import com.couchbase.client.java.transactions.error.TransactionCommitAmbiguousException;
4646
import com.couchbase.client.java.transactions.error.TransactionFailedException;
47+
import reactor.util.context.ContextView;
4748

4849
/**
4950
* The Couchbase transaction manager, providing support for @Transactional methods.
@@ -73,7 +74,7 @@ public CouchbaseCallbackTransactionManager(CouchbaseClientFactory couchbaseClien
7374

7475
@Override
7576
public <T> T execute(TransactionDefinition definition, TransactionCallback<T> callback) throws TransactionException {
76-
boolean createNewTransaction = handlePropagation(definition);
77+
boolean createNewTransaction = handlePropagation(definition, null);
7778

7879
setOptionsFromDefinition(definition);
7980

@@ -87,8 +88,8 @@ public <T> T execute(TransactionDefinition definition, TransactionCallback<T> ca
8788
@Stability.Internal
8889
<T> Flux<T> executeReactive(TransactionDefinition definition,
8990
org.springframework.transaction.reactive.TransactionCallback<T> callback) {
90-
return Flux.defer(() -> {
91-
boolean createNewTransaction = handlePropagation(definition);
91+
return Flux.deferContextual((ctx) -> {
92+
boolean createNewTransaction = handlePropagation(definition, ctx);
9293

9394
setOptionsFromDefinition(definition);
9495

@@ -187,8 +188,9 @@ public boolean isCompleted() {
187188
}
188189

189190
// Propagation defines what happens when a @Transactional method is called from another @Transactional method.
190-
private boolean handlePropagation(TransactionDefinition definition) {
191-
boolean isExistingTransaction = TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent();
191+
private boolean handlePropagation(TransactionDefinition definition, ContextView ctx) {
192+
boolean isExistingTransaction = ctx != null ? TransactionalSupport.checkForTransactionInThreadLocalStorage(ctx).isPresent() :
193+
TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent();
192194

193195
LOGGER.trace("Deciding propagation behaviour from {} and {}", definition.getPropagationBehavior(),
194196
isExistingTransaction);

0 commit comments

Comments
 (0)