diff --git a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java index 76436be5c..9cbbe31c9 100644 --- a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java +++ b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java @@ -78,8 +78,7 @@ public Mono save(T entity, String... scopeAndCollection) { String scope = scopeAndCollection.length > 0 ? scopeAndCollection[0] : null; String collection = scopeAndCollection.length > 1 ? scopeAndCollection[1] : null; - return Mono.deferContextual(xxx -> { - Mono result; + return Mono.defer(() -> { final CouchbasePersistentEntity mapperEntity = getConverter().getMappingContext() .getPersistentEntity(entity.getClass()); final CouchbasePersistentProperty versionProperty = mapperEntity.getVersionProperty(); @@ -94,28 +93,29 @@ public Mono save(T entity, String... scopeAndCollection) { if (!versionPresent) { // the entity doesn't have a version property // No version field - no cas // If in a transaction, insert is the only thing that will work - if (TransactionalSupport.checkForTransactionInThreadLocalStorage(xxx) - .isPresent()) { - result = (Mono) insertById(clazz).inScope(scope) - .inCollection(collection) - .one(entity); - } else { // if not in a tx, then upsert will work - result = (Mono) upsertById(clazz).inScope(scope) - .inCollection(collection) - .one(entity); - } + return TransactionalSupport.checkForTransactionInThreadLocalStorage() + .flatMap(ctx -> { + if (ctx.isPresent()) { + return (Mono) insertById(clazz).inScope(scope) + .inCollection(collection) + .one(entity); + } else { // if not in a tx, then upsert will work + return (Mono) upsertById(clazz).inScope(scope) + .inCollection(collection) + .one(entity); + } + }); } else if (existingDocument) { // there is a version property, and it is non-zero // Updating existing document with cas - result = (Mono) replaceById(clazz).inScope(scope) + return (Mono) replaceById(clazz).inScope(scope) .inCollection(collection) .one(entity); } else { // there is a version property, but it's zero or not set. // Creating new document - result = (Mono) insertById(clazz).inScope(scope) + return (Mono) insertById(clazz).inScope(scope) .inCollection(collection) .one(entity); } - return result; }); } diff --git a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java index 4614a240d..3f5da9564 100644 --- a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java +++ b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java @@ -52,14 +52,6 @@ public static Mono> checkForTransactionInThrea }); } - public static Optional checkForTransactionInThreadLocalStorage(ContextView ctx) { - return Optional.ofNullable(ctx.hasKey(TransactionMarker.class) ? new CouchbaseResourceHolder(ctx.get(TransactionMarker.class).context()) : null); - } - - //public static Optional blockingCheckForTransactionInThreadLocalStorage() { - // return TransactionMarkerOwner.marker; - // } - public static Mono verifyNotInTransaction(String methodName) { return checkForTransactionInThreadLocalStorage().flatMap(s -> { if (s.isPresent()) { diff --git a/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsNonBlockingThreadIntegrationTests.java b/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsNonBlockingThreadIntegrationTests.java new file mode 100644 index 000000000..6444803ab --- /dev/null +++ b/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsNonBlockingThreadIntegrationTests.java @@ -0,0 +1,91 @@ +/* + * Copyright 2022 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.data.couchbase.transactions.sdk; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertNotInTransaction; + +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.couchbase.CouchbaseClientFactory; +import org.springframework.data.couchbase.core.CouchbaseTemplate; +import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate; +import org.springframework.data.couchbase.domain.Person; +import org.springframework.data.couchbase.transactions.TransactionsConfig; +import org.springframework.data.couchbase.util.Capabilities; +import org.springframework.data.couchbase.util.ClusterType; +import org.springframework.data.couchbase.util.IgnoreWhen; +import org.springframework.data.couchbase.util.JavaIntegrationTests; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * Added for issue 1527: Tests running regular SDK transactions (blocking and reactive) on a reactor non-blocking + * thread. + * + * @author Graham Pople + */ +@IgnoreWhen(missesCapabilities = Capabilities.QUERY, clusterTypes = ClusterType.MOCKED) +@SpringJUnitConfig(TransactionsConfig.class) +public class SDKTransactionsNonBlockingThreadIntegrationTests extends JavaIntegrationTests { + @Autowired private CouchbaseClientFactory couchbaseClientFactory; + @Autowired private CouchbaseTemplate ops; + @Autowired private ReactiveCouchbaseTemplate reactiveOps; + + @BeforeEach + public void beforeEachTest() { + assertNotInTransaction(); + } + + @AfterEach + public void afterEachTest() { + assertNotInTransaction(); + } + + @DisplayName("Trying to run a blocking transaction (or anything blocking) on a non-blocking thread, will not work") + @Test + public void blockingTransactionOnNonBlockingThread() { + try { + Mono.just(1).publishOn(Schedulers.parallel()).flatMap(ignore -> { + assertTrue(Schedulers.isInNonBlockingThread()); + assertTrue(Thread.currentThread().getName().contains("parallel")); + + couchbaseClientFactory.getCluster().transactions().run(ctx -> { + ops.insertById(Person.class).one(new Person("Walter", "White")); + }); + return Mono.empty(); + }).block(); + fail(); + } catch (IllegalStateException ignored) {} + } + + @DisplayName("Trying to run a reactive transaction on a non-blocking thread should work") + @Test + public void reactiveTransactionOnNonBlockingThread() { + Mono.just(1).publishOn(Schedulers.parallel()).flatMap(ignore -> { + return couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> { + return reactiveOps.insertById(Person.class).one(new Person("Walter", "White")); + }); + }).block(); + } +} diff --git a/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsSaveIntegrationTests.java b/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsSaveIntegrationTests.java new file mode 100644 index 000000000..604504fb9 --- /dev/null +++ b/spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsSaveIntegrationTests.java @@ -0,0 +1,101 @@ +/* + * Copyright 2022 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.data.couchbase.transactions.sdk; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertNotInTransaction; + +import org.springframework.data.couchbase.domain.PersonWithoutVersion; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.couchbase.CouchbaseClientFactory; +import org.springframework.data.couchbase.core.CouchbaseTemplate; +import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate; +import org.springframework.data.couchbase.domain.Person; +import org.springframework.data.couchbase.transactions.TransactionsConfig; +import org.springframework.data.couchbase.util.Capabilities; +import org.springframework.data.couchbase.util.ClusterType; +import org.springframework.data.couchbase.util.IgnoreWhen; +import org.springframework.data.couchbase.util.JavaIntegrationTests; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * Added for issue 1527: Tests the .save() command. + * + * @author Graham Pople + */ +@IgnoreWhen(missesCapabilities = Capabilities.QUERY, clusterTypes = ClusterType.MOCKED) +@SpringJUnitConfig(TransactionsConfig.class) +public class SDKTransactionsSaveIntegrationTests extends JavaIntegrationTests { + @Autowired private CouchbaseClientFactory couchbaseClientFactory; + @Autowired private CouchbaseTemplate ops; + @Autowired private ReactiveCouchbaseTemplate reactiveOps; + + @BeforeEach + public void beforeEachTest() { + assertNotInTransaction(); + } + + @AfterEach + public void afterEachTest() { + assertNotInTransaction(); + } + + @DisplayName("ReactiveCouchbaseTemplate.save() called inside a reactive SDK transaction should work") + @Test + public void reactiveSaveInReactiveTransaction() { + couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> { + PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White"); + return reactiveOps.save(p); + }).block(); + } + + @DisplayName("ReactiveCouchbaseTemplate.save().block() called inside a non-reactive SDK transaction should work") + @Test + public void reactiveSaveInBlockingTransaction() { + couchbaseClientFactory.getCluster().transactions().run(ctx -> { + PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White"); + reactiveOps.save(p).block(); + }); + } + + @DisplayName("ReactiveCouchbaseTemplate.save() called inside a reactive SDK transaction should work") + @Test + public void blockingSaveInReactiveTransaction() { + couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> { + PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White"); + ops.save(p); + return Mono.empty(); + }).block(); + } + + @DisplayName("ReactiveCouchbaseTemplate.save().block() called inside a non-reactive SDK transaction should work") + @Test + public void blockingSaveInBlockingTransaction() { + couchbaseClientFactory.getCluster().transactions().run(ctx -> { + PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White"); + ops.save(p); + }); + } +}