Skip to content

Datacouch 1145 transaction support #1447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,29 @@
*/
package org.springframework.data.couchbase.transaction;

import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
import com.couchbase.client.java.transactions.AttemptContextReactiveAccessor;
import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext;
import com.couchbase.client.java.transactions.TransactionAttemptContext;
import com.couchbase.client.java.transactions.TransactionResult;
import com.couchbase.client.java.transactions.config.TransactionOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.couchbase.CouchbaseClientFactory;
import org.springframework.data.couchbase.ReactiveCouchbaseClientFactory;
import org.springframework.lang.Nullable;
import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.InvalidTimeoutException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.reactive.TransactionContextManager;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;

public class CouchbaseSimpleCallbackTransactionManager implements CallbackPreferringPlatformTransactionManager {

private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseTransactionManager.class);
private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSimpleCallbackTransactionManager.class);

private final ReactiveCouchbaseClientFactory couchbaseClientFactory;
private TransactionOptions options;
Expand All @@ -59,19 +49,29 @@ public CouchbaseSimpleCallbackTransactionManager(ReactiveCouchbaseClientFactory

@Override
public <T> T execute(TransactionDefinition definition, TransactionCallback<T> callback) throws TransactionException {
final AtomicReference<T> execResult = new AtomicReference<>();
boolean createNewTransaction = handlePropagation(definition);

setOptionsFromDefinition(definition);

if (createNewTransaction) {
return executeNewTransaction(callback);
}
else {
return callback.doInTransaction(null);
}
}

private <T> T executeNewTransaction(TransactionCallback<T> callback) {
final AtomicReference<T> execResult = new AtomicReference<>();

TransactionResult result = couchbaseClientFactory.getCluster().block().transactions().run(ctx -> {
CouchbaseTransactionStatus status = new CouchbaseTransactionStatus(null, true, false, false, true, null, null);

populateTransactionSynchronizationManager(ctx);

try {
execResult.set(callback.doInTransaction(status));
}
finally {
} finally {
TransactionSynchronizationManager.clear();
}
}, this.options);
Expand All @@ -81,6 +81,61 @@ public <T> T execute(TransactionDefinition definition, TransactionCallback<T> ca
return execResult.get();
}

// Propagation defines what happens when a @Transactional method is called from another @Transactional method.
private boolean handlePropagation(TransactionDefinition definition) {
boolean isExistingTransaction = TransactionSynchronizationManager.isActualTransactionActive();

LOGGER.trace("Deciding propagation behaviour from {} and {}", definition.getPropagationBehavior(), isExistingTransaction);

switch (definition.getPropagationBehavior()) {
case TransactionDefinition.PROPAGATION_REQUIRED:
// Make a new transaction if required, else just execute the new method in the current transaction.
return !isExistingTransaction;

case TransactionDefinition.PROPAGATION_SUPPORTS:
// Don't appear to have the ability to execute the callback non-transactionally in this layer.
throw new IllegalTransactionStateException(
"Propagation level 'support' has been specified which is not supported");

case TransactionDefinition.PROPAGATION_MANDATORY:
if (!isExistingTransaction) {
throw new IllegalTransactionStateException(
"Propagation level 'mandatory' is specified but not in an active transaction");
}
return false;

case TransactionDefinition.PROPAGATION_REQUIRES_NEW:
// This requires suspension of the active transaction. This will be possible to support in a future
// release, if required.
throw new IllegalTransactionStateException(
"Propagation level 'requires_new' has been specified which is not currently supported");

case TransactionDefinition.PROPAGATION_NOT_SUPPORTED:
// Don't appear to have the ability to execute the callback non-transactionally in this layer.
throw new IllegalTransactionStateException(
"Propagation level 'not_supported' has been specified which is not supported");

case TransactionDefinition.PROPAGATION_NEVER:
if (isExistingTransaction) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
return true;

case TransactionDefinition.PROPAGATION_NESTED:
if (isExistingTransaction) {
// Couchbase transactions cannot be nested.
throw new IllegalTransactionStateException(
"Propagation level 'nested' has been specified which is not supported");
}
return true;

default:
throw new IllegalTransactionStateException(
"Unknown propagation level " + definition.getPropagationBehavior() + " has been specified");
}
}

/**
* @param definition reflects the @Transactional options
*/
Expand All @@ -96,8 +151,6 @@ private void setOptionsFromDefinition(TransactionDefinition definition) {
}

// readonly is ignored as it is documented as being a hint that won't necessarily cause writes to fail

// todo gpx what about propagation?
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext;
import com.couchbase.client.java.transactions.TransactionGetResult;
import com.couchbase.client.java.transactions.TransactionResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.couchbase.core.CouchbaseTemplate;
import org.springframework.transaction.ReactiveTransaction;
import org.springframework.transaction.TransactionException;
Expand Down Expand Up @@ -38,10 +40,11 @@
* what it finds in the currentContext()?
*
*/
// todo gpx ongoing discussions on whether this can support retries & error handling natively
public class CouchbaseTransactionalOperator implements TransactionalOperator {

// package org.springframework.transaction.reactive;
private static final Log logger = LogFactory.getLog(CouchbaseTransactionalOperator.class);
private static final Logger logger = LoggerFactory.getLogger(CouchbaseTransactionalOperator.class);
private final ReactiveTransactionManager transactionManager;
private final TransactionDefinition transactionDefinition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,7 @@ default List<User> getByFirstname(String firstname) {
} catch (InterruptedException ie) {}
return findByFirstname(firstname);
}

@Override
User save(User user);
}
Loading