44
44
import com .couchbase .client .java .transactions .config .TransactionOptions ;
45
45
import com .couchbase .client .java .transactions .error .TransactionCommitAmbiguousException ;
46
46
import com .couchbase .client .java .transactions .error .TransactionFailedException ;
47
+ import reactor .util .context .ContextView ;
47
48
48
49
/**
49
50
* The Couchbase transaction manager, providing support for @Transactional methods.
@@ -73,7 +74,7 @@ public CouchbaseCallbackTransactionManager(CouchbaseClientFactory couchbaseClien
73
74
74
75
@ Override
75
76
public <T > T execute (TransactionDefinition definition , TransactionCallback <T > callback ) throws TransactionException {
76
- boolean createNewTransaction = handlePropagation (definition );
77
+ boolean createNewTransaction = handlePropagation (definition , null );
77
78
78
79
setOptionsFromDefinition (definition );
79
80
@@ -87,8 +88,8 @@ public <T> T execute(TransactionDefinition definition, TransactionCallback<T> ca
87
88
@ Stability .Internal
88
89
<T > Flux <T > executeReactive (TransactionDefinition definition ,
89
90
org .springframework .transaction .reactive .TransactionCallback <T > callback ) {
90
- return Flux .defer (( ) -> {
91
- boolean createNewTransaction = handlePropagation (definition );
91
+ return Flux .deferContextual (( ctx ) -> {
92
+ boolean createNewTransaction = handlePropagation (definition , ctx );
92
93
93
94
setOptionsFromDefinition (definition );
94
95
@@ -187,8 +188,9 @@ public boolean isCompleted() {
187
188
}
188
189
189
190
// Propagation defines what happens when a @Transactional method is called from another @Transactional method.
190
- private boolean handlePropagation (TransactionDefinition definition ) {
191
- boolean isExistingTransaction = TransactionalSupport .checkForTransactionInThreadLocalStorage ().block ().isPresent ();
191
+ private boolean handlePropagation (TransactionDefinition definition , ContextView ctx ) {
192
+ boolean isExistingTransaction = ctx != null ? TransactionalSupport .checkForTransactionInThreadLocalStorage (ctx ).isPresent () :
193
+ TransactionalSupport .checkForTransactionInThreadLocalStorage ().block ().isPresent ();
192
194
193
195
LOGGER .trace ("Deciding propagation behaviour from {} and {}" , definition .getPropagationBehavior (),
194
196
isExistingTransaction );
0 commit comments