diff --git a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java index 18d2951e5..9cbbe31c9 100644 --- a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java +++ b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java @@ -59,7 +59,7 @@ public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, fin } public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, final CouchbaseConverter converter, - final TranslationService translationService) { + final TranslationService translationService) { this(clientFactory, converter, translationService, null); } @@ -75,35 +75,48 @@ public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, fin @Override public Mono save(T entity, String... scopeAndCollection) { Assert.notNull(entity, "Entity must not be null!"); + String scope = scopeAndCollection.length > 0 ? scopeAndCollection[0] : null; String collection = scopeAndCollection.length > 1 ? scopeAndCollection[1] : null; - Mono result; - final CouchbasePersistentEntity mapperEntity = getConverter().getMappingContext() - .getPersistentEntity(entity.getClass()); - final CouchbasePersistentProperty versionProperty = mapperEntity.getVersionProperty(); - final boolean versionPresent = versionProperty != null; - final Long version = versionProperty == null || versionProperty.getField() == null ? null - : (Long) ReflectionUtils.getField(versionProperty.getField(), entity); - final boolean existingDocument = version != null && version > 0; - - Class clazz = entity.getClass(); - - if (!versionPresent) { // the entity doesn't have a version property - // No version field - no cas - // If in a transaction, insert is the only thing that will work - if (TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent()) { - result = (Mono) insertById(clazz).inScope(scope).inCollection(collection).one(entity); - } else { // if not in a tx, then upsert will work - result = (Mono) upsertById(clazz).inScope(scope).inCollection(collection).one(entity); + return Mono.defer(() -> { + final CouchbasePersistentEntity mapperEntity = getConverter().getMappingContext() + .getPersistentEntity(entity.getClass()); + final CouchbasePersistentProperty versionProperty = mapperEntity.getVersionProperty(); + final boolean versionPresent = versionProperty != null; + final Long version = versionProperty == null || versionProperty.getField() == null ? null + : (Long) ReflectionUtils.getField(versionProperty.getField(), + entity); + final boolean existingDocument = version != null && version > 0; + + Class clazz = entity.getClass(); + + if (!versionPresent) { // the entity doesn't have a version property + // No version field - no cas + // If in a transaction, insert is the only thing that will work + return TransactionalSupport.checkForTransactionInThreadLocalStorage() + .flatMap(ctx -> { + if (ctx.isPresent()) { + return (Mono) insertById(clazz).inScope(scope) + .inCollection(collection) + .one(entity); + } else { // if not in a tx, then upsert will work + return (Mono) upsertById(clazz).inScope(scope) + .inCollection(collection) + .one(entity); + } + }); + } else if (existingDocument) { // there is a version property, and it is non-zero + // Updating existing document with cas + return (Mono) replaceById(clazz).inScope(scope) + .inCollection(collection) + .one(entity); + } else { // there is a version property, but it's zero or not set. + // Creating new document + return (Mono) insertById(clazz).inScope(scope) + .inCollection(collection) + .one(entity); } - } else if (existingDocument) { // there is a version property, and it is non-zero - // Updating existing document with cas - result = (Mono) replaceById(clazz).inScope(scope).inCollection(collection).one(entity); - } else { // there is a version property, but it's zero or not set. - // Creating new document - result = (Mono) insertById(clazz).inScope(scope).inCollection(collection).one(entity); - } - return result; + }); } public Mono count(Query query, Class domainType) { diff --git a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java index 736424796..3f5da9564 100644 --- a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java +++ b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java @@ -52,18 +52,10 @@ public static Mono> checkForTransactionInThrea }); } - public static Optional checkForTransactionInThreadLocalStorage(ContextView ctx) { - return Optional.ofNullable(ctx.hasKey(TransactionMarker.class) ? new CouchbaseResourceHolder(ctx.get(TransactionMarker.class).context()) : null); - } - - //public static Optional blockingCheckForTransactionInThreadLocalStorage() { - // return TransactionMarkerOwner.marker; - // } - public static Mono verifyNotInTransaction(String methodName) { return checkForTransactionInThreadLocalStorage().flatMap(s -> { if (s.isPresent()) { - return Mono.error(new IllegalArgumentException(methodName + "can not be used inside a transaction")); + return Mono.error(new IllegalArgumentException(methodName + " can not be used inside a transaction")); } else { return Mono.empty(); } diff --git a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/mapping/CouchbaseList.java b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/mapping/CouchbaseList.java index d7cd4b8a1..777e0fe11 100644 --- a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/mapping/CouchbaseList.java +++ b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/mapping/CouchbaseList.java @@ -202,7 +202,7 @@ private void verifyValueType(final Object value) { } throw new IllegalArgumentException( - "Attribute of type " + clazz.getCanonicalName() + "can not be stored and must be converted."); + "Attribute of type " + clazz.getCanonicalName() + " can not be stored and must be converted."); } /** diff --git a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java index 1da999f66..40dd2595e 100644 --- a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java +++ b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java @@ -44,7 +44,6 @@ import com.couchbase.client.java.transactions.config.TransactionOptions; import com.couchbase.client.java.transactions.error.TransactionCommitAmbiguousException; import com.couchbase.client.java.transactions.error.TransactionFailedException; -import reactor.util.context.ContextView; /** * The Couchbase transaction manager, providing support for @Transactional methods. @@ -74,7 +73,9 @@ public CouchbaseCallbackTransactionManager(CouchbaseClientFactory couchbaseClien @Override public T execute(TransactionDefinition definition, TransactionCallback callback) throws TransactionException { - boolean createNewTransaction = handlePropagation(definition, null); + boolean isInExistingTransaction = TransactionalSupport.checkForTransactionInThreadLocalStorage().block() + .isPresent(); + boolean createNewTransaction = handlePropagation(definition, isInExistingTransaction); setOptionsFromDefinition(definition); @@ -88,8 +89,9 @@ public T execute(TransactionDefinition definition, TransactionCallback ca @Stability.Internal Flux executeReactive(TransactionDefinition definition, org.springframework.transaction.reactive.TransactionCallback callback) { - return Flux.deferContextual((ctx) -> { - boolean createNewTransaction = handlePropagation(definition, ctx); + return TransactionalSupport.checkForTransactionInThreadLocalStorage().flatMapMany(isInTransaction -> { + boolean isInExistingTransaction = isInTransaction.isPresent(); + boolean createNewTransaction = handlePropagation(definition, isInExistingTransaction); setOptionsFromDefinition(definition); @@ -188,9 +190,7 @@ public boolean isCompleted() { } // Propagation defines what happens when a @Transactional method is called from another @Transactional method. - private boolean handlePropagation(TransactionDefinition definition, ContextView ctx) { - boolean isExistingTransaction = ctx != null ? TransactionalSupport.checkForTransactionInThreadLocalStorage(ctx).isPresent() : - TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent(); + private Boolean handlePropagation(TransactionDefinition definition, boolean isExistingTransaction) { LOGGER.trace("Deciding propagation behaviour from {} and {}", definition.getPropagationBehavior(), isExistingTransaction); diff --git a/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionReactiveIntegrationTests.java b/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionReactiveIntegrationTests.java index 9ad050e6b..0fe30f476 100644 --- a/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionReactiveIntegrationTests.java +++ b/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionReactiveIntegrationTests.java @@ -19,6 +19,7 @@ import static com.couchbase.client.java.query.QueryScanConsistency.REQUEST_PLUS; import lombok.Data; +import org.springframework.data.couchbase.domain.PersonWithoutVersion; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -49,9 +50,9 @@ /** * todo gp: these tests are using the `.as(transactionalOperator::transactional)` method which is for the chopping - * block, so presumably these tests are too - * todo mr: I'm not sure how as(transactionalOperator::transactional) is different than - * todo mr: transactionOperator.transaction(...)in CouchbaseTransactionalOperatorTemplateIntegrationTests ? + * block, so presumably these tests are too todo mr: I'm not sure how as(transactionalOperator::transactional) is + * different than todo mr: transactionOperator.transaction(...)in CouchbaseTransactionalOperatorTemplateIntegrationTests + * ? * * @author Michael Reiche */ @@ -72,6 +73,7 @@ public class CouchbasePersonTransactionReactiveIntegrationTests extends JavaInte String sName = "_default"; String cName = "_default"; Person WalterWhite; + PersonWithoutVersion BobbyBlackWithoutVersion; @BeforeAll public static void beforeAll() { @@ -86,6 +88,7 @@ public static void afterAll() { @BeforeEach public void beforeEachTest() { WalterWhite = new Person("Walter", "White"); + BobbyBlackWithoutVersion = new PersonWithoutVersion("Bobby", "Black"); TransactionTestUtil.assertNotInTransaction(); List pr = operations.removeByQuery(Person.class).withConsistency(REQUEST_PLUS).all().collectList() .block(); @@ -140,6 +143,12 @@ public void commitShouldPersistTxEntriesOfTxAnnotatedMethod() { } + @Test + public void commitShouldPersistTxEntriesOfTxAnnotatedMethodNoVersion() { + personService.declarativeSavePersonWithoutVersion(BobbyBlackWithoutVersion).as(StepVerifier::create) // + .expectError(UnsupportedOperationException.class); // + } + @Test public void commitShouldPersistTxEntriesAcrossCollections() { diff --git a/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/PersonServiceReactive.java b/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/PersonServiceReactive.java index d80208ac0..dcbf37b09 100644 --- a/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/PersonServiceReactive.java +++ b/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/PersonServiceReactive.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import org.springframework.data.couchbase.core.TransactionalSupport; +import org.springframework.data.couchbase.domain.PersonWithoutVersion; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -109,6 +110,11 @@ public Mono declarativeSavePerson(Person person) { return personOperationsRx.save(person); } + @Transactional + public Mono declarativeSavePersonWithoutVersion(PersonWithoutVersion person) { + return personOperationsRx.save(person); + } + @Transactional public Mono declarativeSavePersonErrors(Person person) { return personOperationsRx.insertById(Person.class).one(person) diff --git a/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsNonBlockingThreadIntegrationTests.java b/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsNonBlockingThreadIntegrationTests.java new file mode 100644 index 000000000..6444803ab --- /dev/null +++ b/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsNonBlockingThreadIntegrationTests.java @@ -0,0 +1,91 @@ +/* + * Copyright 2022 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.data.couchbase.transactions.sdk; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertNotInTransaction; + +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.couchbase.CouchbaseClientFactory; +import org.springframework.data.couchbase.core.CouchbaseTemplate; +import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate; +import org.springframework.data.couchbase.domain.Person; +import org.springframework.data.couchbase.transactions.TransactionsConfig; +import org.springframework.data.couchbase.util.Capabilities; +import org.springframework.data.couchbase.util.ClusterType; +import org.springframework.data.couchbase.util.IgnoreWhen; +import org.springframework.data.couchbase.util.JavaIntegrationTests; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * Added for issue 1527: Tests running regular SDK transactions (blocking and reactive) on a reactor non-blocking + * thread. + * + * @author Graham Pople + */ +@IgnoreWhen(missesCapabilities = Capabilities.QUERY, clusterTypes = ClusterType.MOCKED) +@SpringJUnitConfig(TransactionsConfig.class) +public class SDKTransactionsNonBlockingThreadIntegrationTests extends JavaIntegrationTests { + @Autowired private CouchbaseClientFactory couchbaseClientFactory; + @Autowired private CouchbaseTemplate ops; + @Autowired private ReactiveCouchbaseTemplate reactiveOps; + + @BeforeEach + public void beforeEachTest() { + assertNotInTransaction(); + } + + @AfterEach + public void afterEachTest() { + assertNotInTransaction(); + } + + @DisplayName("Trying to run a blocking transaction (or anything blocking) on a non-blocking thread, will not work") + @Test + public void blockingTransactionOnNonBlockingThread() { + try { + Mono.just(1).publishOn(Schedulers.parallel()).flatMap(ignore -> { + assertTrue(Schedulers.isInNonBlockingThread()); + assertTrue(Thread.currentThread().getName().contains("parallel")); + + couchbaseClientFactory.getCluster().transactions().run(ctx -> { + ops.insertById(Person.class).one(new Person("Walter", "White")); + }); + return Mono.empty(); + }).block(); + fail(); + } catch (IllegalStateException ignored) {} + } + + @DisplayName("Trying to run a reactive transaction on a non-blocking thread should work") + @Test + public void reactiveTransactionOnNonBlockingThread() { + Mono.just(1).publishOn(Schedulers.parallel()).flatMap(ignore -> { + return couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> { + return reactiveOps.insertById(Person.class).one(new Person("Walter", "White")); + }); + }).block(); + } +} diff --git a/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsSaveIntegrationTests.java b/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsSaveIntegrationTests.java new file mode 100644 index 000000000..604504fb9 --- /dev/null +++ b/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsSaveIntegrationTests.java @@ -0,0 +1,101 @@ +/* + * Copyright 2022 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.data.couchbase.transactions.sdk; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertNotInTransaction; + +import org.springframework.data.couchbase.domain.PersonWithoutVersion; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.couchbase.CouchbaseClientFactory; +import org.springframework.data.couchbase.core.CouchbaseTemplate; +import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate; +import org.springframework.data.couchbase.domain.Person; +import org.springframework.data.couchbase.transactions.TransactionsConfig; +import org.springframework.data.couchbase.util.Capabilities; +import org.springframework.data.couchbase.util.ClusterType; +import org.springframework.data.couchbase.util.IgnoreWhen; +import org.springframework.data.couchbase.util.JavaIntegrationTests; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * Added for issue 1527: Tests the .save() command. + * + * @author Graham Pople + */ +@IgnoreWhen(missesCapabilities = Capabilities.QUERY, clusterTypes = ClusterType.MOCKED) +@SpringJUnitConfig(TransactionsConfig.class) +public class SDKTransactionsSaveIntegrationTests extends JavaIntegrationTests { + @Autowired private CouchbaseClientFactory couchbaseClientFactory; + @Autowired private CouchbaseTemplate ops; + @Autowired private ReactiveCouchbaseTemplate reactiveOps; + + @BeforeEach + public void beforeEachTest() { + assertNotInTransaction(); + } + + @AfterEach + public void afterEachTest() { + assertNotInTransaction(); + } + + @DisplayName("ReactiveCouchbaseTemplate.save() called inside a reactive SDK transaction should work") + @Test + public void reactiveSaveInReactiveTransaction() { + couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> { + PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White"); + return reactiveOps.save(p); + }).block(); + } + + @DisplayName("ReactiveCouchbaseTemplate.save().block() called inside a non-reactive SDK transaction should work") + @Test + public void reactiveSaveInBlockingTransaction() { + couchbaseClientFactory.getCluster().transactions().run(ctx -> { + PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White"); + reactiveOps.save(p).block(); + }); + } + + @DisplayName("ReactiveCouchbaseTemplate.save() called inside a reactive SDK transaction should work") + @Test + public void blockingSaveInReactiveTransaction() { + couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> { + PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White"); + ops.save(p); + return Mono.empty(); + }).block(); + } + + @DisplayName("ReactiveCouchbaseTemplate.save().block() called inside a non-reactive SDK transaction should work") + @Test + public void blockingSaveInBlockingTransaction() { + couchbaseClientFactory.getCluster().transactions().run(ctx -> { + PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White"); + ops.save(p); + }); + } +}