From 3107c4004a87ccdc295d6f12d5e33b4eb8648363 Mon Sep 17 00:00:00 2001 From: Michael Reiche <48999328+mikereiche@users.noreply.github.com> Date: Thu, 4 Aug 2022 20:31:52 -0700 Subject: [PATCH] Fix call to block() in CouchbaseCallbackTransactionManager. Closes #1527. --- .../data/couchbase/core/TransactionalSupport.java | 10 ++++++++++ .../CouchbaseCallbackTransactionManager.java | 12 +++++++----- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java index 6b56e8647..736424796 100644 --- a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java +++ b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java @@ -15,6 +15,7 @@ */ package org.springframework.data.couchbase.core; +import com.couchbase.client.core.transaction.threadlocal.TransactionMarker; import reactor.core.publisher.Mono; import java.util.Optional; @@ -26,6 +27,7 @@ import com.couchbase.client.core.error.transaction.TransactionOperationFailedException; import com.couchbase.client.core.transaction.CoreTransactionAttemptContext; import com.couchbase.client.core.transaction.threadlocal.TransactionMarkerOwner; +import reactor.util.context.ContextView; /** * Utility methods to support transactions. @@ -50,6 +52,14 @@ public static Mono> checkForTransactionInThrea }); } + public static Optional checkForTransactionInThreadLocalStorage(ContextView ctx) { + return Optional.ofNullable(ctx.hasKey(TransactionMarker.class) ? new CouchbaseResourceHolder(ctx.get(TransactionMarker.class).context()) : null); + } + + //public static Optional blockingCheckForTransactionInThreadLocalStorage() { + // return TransactionMarkerOwner.marker; + // } + public static Mono verifyNotInTransaction(String methodName) { return checkForTransactionInThreadLocalStorage().flatMap(s -> { if (s.isPresent()) { diff --git a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java index e6befd9d9..1da999f66 100644 --- a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java +++ b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java @@ -44,6 +44,7 @@ import com.couchbase.client.java.transactions.config.TransactionOptions; import com.couchbase.client.java.transactions.error.TransactionCommitAmbiguousException; import com.couchbase.client.java.transactions.error.TransactionFailedException; +import reactor.util.context.ContextView; /** * The Couchbase transaction manager, providing support for @Transactional methods. @@ -73,7 +74,7 @@ public CouchbaseCallbackTransactionManager(CouchbaseClientFactory couchbaseClien @Override public T execute(TransactionDefinition definition, TransactionCallback callback) throws TransactionException { - boolean createNewTransaction = handlePropagation(definition); + boolean createNewTransaction = handlePropagation(definition, null); setOptionsFromDefinition(definition); @@ -87,8 +88,8 @@ public T execute(TransactionDefinition definition, TransactionCallback ca @Stability.Internal Flux executeReactive(TransactionDefinition definition, org.springframework.transaction.reactive.TransactionCallback callback) { - return Flux.defer(() -> { - boolean createNewTransaction = handlePropagation(definition); + return Flux.deferContextual((ctx) -> { + boolean createNewTransaction = handlePropagation(definition, ctx); setOptionsFromDefinition(definition); @@ -187,8 +188,9 @@ public boolean isCompleted() { } // Propagation defines what happens when a @Transactional method is called from another @Transactional method. - private boolean handlePropagation(TransactionDefinition definition) { - boolean isExistingTransaction = TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent(); + private boolean handlePropagation(TransactionDefinition definition, ContextView ctx) { + boolean isExistingTransaction = ctx != null ? TransactionalSupport.checkForTransactionInThreadLocalStorage(ctx).isPresent() : + TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent(); LOGGER.trace("Deciding propagation behaviour from {} and {}", definition.getPropagationBehavior(), isExistingTransaction);