Skip to content

Commit 1e71f1c

Browse files
authored
Datacouch 1145 transaction support (#1447)
* Move CouchbaseTransactionalOperator to use SLF4J, same as rest of the code. * Handle all propagation levels * Adding new tests for repository calls inside @transactional One test is failure due to what looks like a bug elsewhere. * Rename CouchbaseTransactionalIntegrationTests, and check after each test that we're not in a transaction.
1 parent 1701183 commit 1e71f1c

7 files changed

+681
-20
lines changed

src/main/java/org/springframework/data/couchbase/transaction/CouchbaseSimpleCallbackTransactionManager.java

+69-16
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,29 @@
1515
*/
1616
package org.springframework.data.couchbase.transaction;
1717

18-
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
1918
import com.couchbase.client.java.transactions.AttemptContextReactiveAccessor;
20-
import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext;
2119
import com.couchbase.client.java.transactions.TransactionAttemptContext;
2220
import com.couchbase.client.java.transactions.TransactionResult;
2321
import com.couchbase.client.java.transactions.config.TransactionOptions;
2422
import org.slf4j.Logger;
2523
import org.slf4j.LoggerFactory;
26-
import org.springframework.data.couchbase.CouchbaseClientFactory;
2724
import org.springframework.data.couchbase.ReactiveCouchbaseClientFactory;
2825
import org.springframework.lang.Nullable;
2926
import org.springframework.transaction.IllegalTransactionStateException;
30-
import org.springframework.transaction.InvalidTimeoutException;
31-
import org.springframework.transaction.PlatformTransactionManager;
3227
import org.springframework.transaction.TransactionDefinition;
3328
import org.springframework.transaction.TransactionException;
3429
import org.springframework.transaction.TransactionStatus;
35-
import org.springframework.transaction.reactive.TransactionContextManager;
36-
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
3730
import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager;
3831
import org.springframework.transaction.support.DefaultTransactionStatus;
3932
import org.springframework.transaction.support.TransactionCallback;
4033
import org.springframework.transaction.support.TransactionSynchronizationManager;
41-
import org.springframework.util.Assert;
42-
import reactor.core.publisher.Mono;
4334

44-
import java.lang.reflect.Field;
4535
import java.time.Duration;
4636
import java.util.concurrent.atomic.AtomicReference;
4737

4838
public class CouchbaseSimpleCallbackTransactionManager implements CallbackPreferringPlatformTransactionManager {
4939

50-
private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseTransactionManager.class);
40+
private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSimpleCallbackTransactionManager.class);
5141

5242
private final ReactiveCouchbaseClientFactory couchbaseClientFactory;
5343
private TransactionOptions options;
@@ -59,19 +49,29 @@ public CouchbaseSimpleCallbackTransactionManager(ReactiveCouchbaseClientFactory
5949

6050
@Override
6151
public <T> T execute(TransactionDefinition definition, TransactionCallback<T> callback) throws TransactionException {
62-
final AtomicReference<T> execResult = new AtomicReference<>();
52+
boolean createNewTransaction = handlePropagation(definition);
6353

6454
setOptionsFromDefinition(definition);
6555

56+
if (createNewTransaction) {
57+
return executeNewTransaction(callback);
58+
}
59+
else {
60+
return callback.doInTransaction(null);
61+
}
62+
}
63+
64+
private <T> T executeNewTransaction(TransactionCallback<T> callback) {
65+
final AtomicReference<T> execResult = new AtomicReference<>();
66+
6667
TransactionResult result = couchbaseClientFactory.getCluster().block().transactions().run(ctx -> {
6768
CouchbaseTransactionStatus status = new CouchbaseTransactionStatus(null, true, false, false, true, null, null);
6869

6970
populateTransactionSynchronizationManager(ctx);
7071

7172
try {
7273
execResult.set(callback.doInTransaction(status));
73-
}
74-
finally {
74+
} finally {
7575
TransactionSynchronizationManager.clear();
7676
}
7777
}, this.options);
@@ -81,6 +81,61 @@ public <T> T execute(TransactionDefinition definition, TransactionCallback<T> ca
8181
return execResult.get();
8282
}
8383

84+
// Propagation defines what happens when a @Transactional method is called from another @Transactional method.
85+
private boolean handlePropagation(TransactionDefinition definition) {
86+
boolean isExistingTransaction = TransactionSynchronizationManager.isActualTransactionActive();
87+
88+
LOGGER.trace("Deciding propagation behaviour from {} and {}", definition.getPropagationBehavior(), isExistingTransaction);
89+
90+
switch (definition.getPropagationBehavior()) {
91+
case TransactionDefinition.PROPAGATION_REQUIRED:
92+
// Make a new transaction if required, else just execute the new method in the current transaction.
93+
return !isExistingTransaction;
94+
95+
case TransactionDefinition.PROPAGATION_SUPPORTS:
96+
// Don't appear to have the ability to execute the callback non-transactionally in this layer.
97+
throw new IllegalTransactionStateException(
98+
"Propagation level 'support' has been specified which is not supported");
99+
100+
case TransactionDefinition.PROPAGATION_MANDATORY:
101+
if (!isExistingTransaction) {
102+
throw new IllegalTransactionStateException(
103+
"Propagation level 'mandatory' is specified but not in an active transaction");
104+
}
105+
return false;
106+
107+
case TransactionDefinition.PROPAGATION_REQUIRES_NEW:
108+
// This requires suspension of the active transaction. This will be possible to support in a future
109+
// release, if required.
110+
throw new IllegalTransactionStateException(
111+
"Propagation level 'requires_new' has been specified which is not currently supported");
112+
113+
case TransactionDefinition.PROPAGATION_NOT_SUPPORTED:
114+
// Don't appear to have the ability to execute the callback non-transactionally in this layer.
115+
throw new IllegalTransactionStateException(
116+
"Propagation level 'not_supported' has been specified which is not supported");
117+
118+
case TransactionDefinition.PROPAGATION_NEVER:
119+
if (isExistingTransaction) {
120+
throw new IllegalTransactionStateException(
121+
"Existing transaction found for transaction marked with propagation 'never'");
122+
}
123+
return true;
124+
125+
case TransactionDefinition.PROPAGATION_NESTED:
126+
if (isExistingTransaction) {
127+
// Couchbase transactions cannot be nested.
128+
throw new IllegalTransactionStateException(
129+
"Propagation level 'nested' has been specified which is not supported");
130+
}
131+
return true;
132+
133+
default:
134+
throw new IllegalTransactionStateException(
135+
"Unknown propagation level " + definition.getPropagationBehavior() + " has been specified");
136+
}
137+
}
138+
84139
/**
85140
* @param definition reflects the @Transactional options
86141
*/
@@ -96,8 +151,6 @@ private void setOptionsFromDefinition(TransactionDefinition definition) {
96151
}
97152

98153
// readonly is ignored as it is documented as being a hint that won't necessarily cause writes to fail
99-
100-
// todo gpx what about propagation?
101154
}
102155

103156
}

src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionalOperator.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext;
77
import com.couchbase.client.java.transactions.TransactionGetResult;
88
import com.couchbase.client.java.transactions.TransactionResult;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
911
import org.springframework.data.couchbase.core.CouchbaseTemplate;
1012
import org.springframework.transaction.ReactiveTransaction;
1113
import org.springframework.transaction.TransactionException;
@@ -38,10 +40,11 @@
3840
* what it finds in the currentContext()?
3941
*
4042
*/
43+
// todo gpx ongoing discussions on whether this can support retries & error handling natively
4144
public class CouchbaseTransactionalOperator implements TransactionalOperator {
4245

4346
// package org.springframework.transaction.reactive;
44-
private static final Log logger = LogFactory.getLog(CouchbaseTransactionalOperator.class);
47+
private static final Logger logger = LoggerFactory.getLogger(CouchbaseTransactionalOperator.class);
4548
private final ReactiveTransactionManager transactionManager;
4649
private final TransactionDefinition transactionDefinition;
4750

src/test/java/org/springframework/data/couchbase/domain/UserRepository.java

+3
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,7 @@ default List<User> getByFirstname(String firstname) {
6767
} catch (InterruptedException ie) {}
6868
return findByFirstname(firstname);
6969
}
70+
71+
@Override
72+
User save(User user);
7073
}

0 commit comments

Comments
 (0)