Skip to content

Fix call to block() in CouchbaseCallbackTransactionManager. #1528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.data.couchbase.core;

import com.couchbase.client.core.transaction.threadlocal.TransactionMarker;
import reactor.core.publisher.Mono;

import java.util.Optional;
Expand All @@ -26,6 +27,7 @@
import com.couchbase.client.core.error.transaction.TransactionOperationFailedException;
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
import com.couchbase.client.core.transaction.threadlocal.TransactionMarkerOwner;
import reactor.util.context.ContextView;

/**
* Utility methods to support transactions.
Expand All @@ -50,6 +52,14 @@ public static Mono<Optional<CouchbaseResourceHolder>> checkForTransactionInThrea
});
}

public static Optional<CouchbaseResourceHolder> checkForTransactionInThreadLocalStorage(ContextView ctx) {
return Optional.ofNullable(ctx.hasKey(TransactionMarker.class) ? new CouchbaseResourceHolder(ctx.get(TransactionMarker.class).context()) : null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't too great as a) it's not DRY or encapsulated - this functionality would really belong in TransactionMarkerOwner, and b) most importantly, it's removing a critical check. We won't know if we're inside a blocking transaction here, as we don't have the ThreadLocal check.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't know if we're inside a blocking transaction here, as we don't have the ThreadLocal check.
Both checks are there, however if the ctx is not due to the transaction, and there is indeed a blocking (ThreadLocal) transaction, we will miss it. Hence the request for a TransactionMarkerOwner api that does require a ctx and return a Mono to get the ThreadLocal core.

	boolean isExistingTransaction = ctx != null ? TransactionalSupport.checkForTransactionInThreadLocalStorage(ctx).isPresent() :
				TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent();

Right. So (a) requires the modification to TransactionMarkerOwner, and (b) needs what I've described in the other comment.

}

//public static Optional<CouchbaseResourceHolder> blockingCheckForTransactionInThreadLocalStorage() {
// return TransactionMarkerOwner.marker;
// }

public static Mono<Void> verifyNotInTransaction(String methodName) {
return checkForTransactionInThreadLocalStorage().flatMap(s -> {
if (s.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.couchbase.client.java.transactions.config.TransactionOptions;
import com.couchbase.client.java.transactions.error.TransactionCommitAmbiguousException;
import com.couchbase.client.java.transactions.error.TransactionFailedException;
import reactor.util.context.ContextView;

/**
* The Couchbase transaction manager, providing support for @Transactional methods.
Expand Down Expand Up @@ -73,7 +74,7 @@ public CouchbaseCallbackTransactionManager(CouchbaseClientFactory couchbaseClien

@Override
public <T> T execute(TransactionDefinition definition, TransactionCallback<T> callback) throws TransactionException {
boolean createNewTransaction = handlePropagation(definition);
boolean createNewTransaction = handlePropagation(definition, null);

setOptionsFromDefinition(definition);

Expand All @@ -87,8 +88,8 @@ public <T> T execute(TransactionDefinition definition, TransactionCallback<T> ca
@Stability.Internal
<T> Flux<T> executeReactive(TransactionDefinition definition,
org.springframework.transaction.reactive.TransactionCallback<T> callback) {
return Flux.defer(() -> {
boolean createNewTransaction = handlePropagation(definition);
return Flux.deferContextual((ctx) -> {
boolean createNewTransaction = handlePropagation(definition, ctx);

setOptionsFromDefinition(definition);

Expand Down Expand Up @@ -187,8 +188,9 @@ public boolean isCompleted() {
}

// Propagation defines what happens when a @Transactional method is called from another @Transactional method.
private boolean handlePropagation(TransactionDefinition definition) {
boolean isExistingTransaction = TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent();
private boolean handlePropagation(TransactionDefinition definition, ContextView ctx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant on JCBC-1988 by "Refactor handlePropagation so we call TransactionalSupport.checkForTransactionInThreadLocalStorage() outside it and pass in the result."
is something like:

executeReactive():
		return TransactionalSupport.checkForTransactionInThreadLocalStorage().flatMap(isInTransaction -> {
		        boolean isInExistingTransaction = isInTransaction.isPresent();
			boolean createNewTransaction = handlePropagation(definition, isInExistingTransaction);
 
execute():
   boolean isInExistingTransaction = TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent();
   boolean createNewTransaction = handlePropagation(definition, isInExistingTransaction);

This preserves the full safety of the existing checkForTransactionInThreadLocalStorage() check (checking whether we are in either a reactive and blocking transaction), and just seems cleaner and simpler than the solution here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the execute() path, block() cannot be called as this may be in a NonBlocking thread.
TransactionMarkerOwner needs an api to get the Core out of ThreadLocal without calling block().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The execute() path is inherently completely blocking (it is the blocking API) and hence can't be called from a NonBlocking thread. If they're trying to do a blocking transaction inside a reactive operator, then that is an application bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we get to the bottom of how they ended up on the execute() path from a reactive @transactional? I read through 1527 and I took from it that our initial guess was that they didn't do @EnableTransactionManagement, but in fact they had.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The springframework TransactionInterceptor was being used instead of CouchbaseTranasctionInterceptor because the customer was configuring using yaml for configuration and not a class that extends that AbstractCouchbaseConfiguration like we had been doing. AbstractCouchbaseConfiguration has substitution of CouchbaseTransactionInterceptor.
I think you have a check for that in the code, but it hit the error on block() before the check.

boolean isExistingTransaction = ctx != null ? TransactionalSupport.checkForTransactionInThreadLocalStorage(ctx).isPresent() :
TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent();

LOGGER.trace("Deciding propagation behaviour from {} and {}", definition.getPropagationBehavior(),
isExistingTransaction);
Expand Down