Skip to content

Commit b2c7475

Browse files
authored
Removing the form of checkForTransactionInThreadLocalStorage (#1538)
that does not use TransactionMarkerOwner. As this bypasses a critical check on whether we are inside a blocking transaction. Two new tests in SDKTransactionsSaveIntegrationTests (reactiveSaveInBlockingTransaction and blockingSaveInBlockingTransaction) will fail without this, as the .save() does not realise it's in a transaction. Adding new tests, for .save() and for performing transactions in reactor blocking threads.
1 parent 463fef6 commit b2c7475

File tree

4 files changed

+207
-23
lines changed

4 files changed

+207
-23
lines changed

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ public <T> Mono<T> save(T entity, String... scopeAndCollection) {
7878

7979
String scope = scopeAndCollection.length > 0 ? scopeAndCollection[0] : null;
8080
String collection = scopeAndCollection.length > 1 ? scopeAndCollection[1] : null;
81-
return Mono.deferContextual(xxx -> {
82-
Mono<T> result;
81+
return Mono.defer(() -> {
8382
final CouchbasePersistentEntity<?> mapperEntity = getConverter().getMappingContext()
8483
.getPersistentEntity(entity.getClass());
8584
final CouchbasePersistentProperty versionProperty = mapperEntity.getVersionProperty();
@@ -94,28 +93,29 @@ public <T> Mono<T> save(T entity, String... scopeAndCollection) {
9493
if (!versionPresent) { // the entity doesn't have a version property
9594
// No version field - no cas
9695
// If in a transaction, insert is the only thing that will work
97-
if (TransactionalSupport.checkForTransactionInThreadLocalStorage(xxx)
98-
.isPresent()) {
99-
result = (Mono<T>) insertById(clazz).inScope(scope)
100-
.inCollection(collection)
101-
.one(entity);
102-
} else { // if not in a tx, then upsert will work
103-
result = (Mono<T>) upsertById(clazz).inScope(scope)
104-
.inCollection(collection)
105-
.one(entity);
106-
}
96+
return TransactionalSupport.checkForTransactionInThreadLocalStorage()
97+
.flatMap(ctx -> {
98+
if (ctx.isPresent()) {
99+
return (Mono<T>) insertById(clazz).inScope(scope)
100+
.inCollection(collection)
101+
.one(entity);
102+
} else { // if not in a tx, then upsert will work
103+
return (Mono<T>) upsertById(clazz).inScope(scope)
104+
.inCollection(collection)
105+
.one(entity);
106+
}
107+
});
107108
} else if (existingDocument) { // there is a version property, and it is non-zero
108109
// Updating existing document with cas
109-
result = (Mono<T>) replaceById(clazz).inScope(scope)
110+
return (Mono<T>) replaceById(clazz).inScope(scope)
110111
.inCollection(collection)
111112
.one(entity);
112113
} else { // there is a version property, but it's zero or not set.
113114
// Creating new document
114-
result = (Mono<T>) insertById(clazz).inScope(scope)
115+
return (Mono<T>) insertById(clazz).inScope(scope)
115116
.inCollection(collection)
116117
.one(entity);
117118
}
118-
return result;
119119
});
120120
}
121121

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,6 @@ public static Mono<Optional<CouchbaseResourceHolder>> checkForTransactionInThrea
5252
});
5353
}
5454

55-
public static Optional<CouchbaseResourceHolder> checkForTransactionInThreadLocalStorage(ContextView ctx) {
56-
return Optional.ofNullable(ctx.hasKey(TransactionMarker.class) ? new CouchbaseResourceHolder(ctx.get(TransactionMarker.class).context()) : null);
57-
}
58-
59-
//public static Optional<CouchbaseResourceHolder> blockingCheckForTransactionInThreadLocalStorage() {
60-
// return TransactionMarkerOwner.marker;
61-
// }
62-
6355
public static Mono<Void> verifyNotInTransaction(String methodName) {
6456
return checkForTransactionInThreadLocalStorage().flatMap(s -> {
6557
if (s.isPresent()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2022 the original author or authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.data.couchbase.transactions.sdk;
18+
19+
import static org.junit.jupiter.api.Assertions.assertTrue;
20+
import static org.junit.jupiter.api.Assertions.fail;
21+
import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertNotInTransaction;
22+
23+
import reactor.core.publisher.Mono;
24+
import reactor.core.scheduler.Schedulers;
25+
26+
import org.junit.jupiter.api.AfterEach;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.DisplayName;
29+
import org.junit.jupiter.api.Test;
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.data.couchbase.CouchbaseClientFactory;
32+
import org.springframework.data.couchbase.core.CouchbaseTemplate;
33+
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
34+
import org.springframework.data.couchbase.domain.Person;
35+
import org.springframework.data.couchbase.transactions.TransactionsConfig;
36+
import org.springframework.data.couchbase.util.Capabilities;
37+
import org.springframework.data.couchbase.util.ClusterType;
38+
import org.springframework.data.couchbase.util.IgnoreWhen;
39+
import org.springframework.data.couchbase.util.JavaIntegrationTests;
40+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
41+
42+
/**
43+
* Added for issue 1527: Tests running regular SDK transactions (blocking and reactive) on a reactor non-blocking
44+
* thread.
45+
*
46+
* @author Graham Pople
47+
*/
48+
@IgnoreWhen(missesCapabilities = Capabilities.QUERY, clusterTypes = ClusterType.MOCKED)
49+
@SpringJUnitConfig(TransactionsConfig.class)
50+
public class SDKTransactionsNonBlockingThreadIntegrationTests extends JavaIntegrationTests {
51+
@Autowired private CouchbaseClientFactory couchbaseClientFactory;
52+
@Autowired private CouchbaseTemplate ops;
53+
@Autowired private ReactiveCouchbaseTemplate reactiveOps;
54+
55+
@BeforeEach
56+
public void beforeEachTest() {
57+
assertNotInTransaction();
58+
}
59+
60+
@AfterEach
61+
public void afterEachTest() {
62+
assertNotInTransaction();
63+
}
64+
65+
@DisplayName("Trying to run a blocking transaction (or anything blocking) on a non-blocking thread, will not work")
66+
@Test
67+
public void blockingTransactionOnNonBlockingThread() {
68+
try {
69+
Mono.just(1).publishOn(Schedulers.parallel()).flatMap(ignore -> {
70+
assertTrue(Schedulers.isInNonBlockingThread());
71+
assertTrue(Thread.currentThread().getName().contains("parallel"));
72+
73+
couchbaseClientFactory.getCluster().transactions().run(ctx -> {
74+
ops.insertById(Person.class).one(new Person("Walter", "White"));
75+
});
76+
return Mono.empty();
77+
}).block();
78+
fail();
79+
} catch (IllegalStateException ignored) {}
80+
}
81+
82+
@DisplayName("Trying to run a reactive transaction on a non-blocking thread should work")
83+
@Test
84+
public void reactiveTransactionOnNonBlockingThread() {
85+
Mono.just(1).publishOn(Schedulers.parallel()).flatMap(ignore -> {
86+
return couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> {
87+
return reactiveOps.insertById(Person.class).one(new Person("Walter", "White"));
88+
});
89+
}).block();
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2022 the original author or authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.data.couchbase.transactions.sdk;
18+
19+
import static org.junit.jupiter.api.Assertions.assertTrue;
20+
import static org.junit.jupiter.api.Assertions.fail;
21+
import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertNotInTransaction;
22+
23+
import org.springframework.data.couchbase.domain.PersonWithoutVersion;
24+
import reactor.core.publisher.Mono;
25+
import reactor.core.scheduler.Schedulers;
26+
27+
import org.junit.jupiter.api.AfterEach;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.DisplayName;
30+
import org.junit.jupiter.api.Test;
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.data.couchbase.CouchbaseClientFactory;
33+
import org.springframework.data.couchbase.core.CouchbaseTemplate;
34+
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
35+
import org.springframework.data.couchbase.domain.Person;
36+
import org.springframework.data.couchbase.transactions.TransactionsConfig;
37+
import org.springframework.data.couchbase.util.Capabilities;
38+
import org.springframework.data.couchbase.util.ClusterType;
39+
import org.springframework.data.couchbase.util.IgnoreWhen;
40+
import org.springframework.data.couchbase.util.JavaIntegrationTests;
41+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
42+
43+
/**
44+
* Added for issue 1527: Tests the .save() command.
45+
*
46+
* @author Graham Pople
47+
*/
48+
@IgnoreWhen(missesCapabilities = Capabilities.QUERY, clusterTypes = ClusterType.MOCKED)
49+
@SpringJUnitConfig(TransactionsConfig.class)
50+
public class SDKTransactionsSaveIntegrationTests extends JavaIntegrationTests {
51+
@Autowired private CouchbaseClientFactory couchbaseClientFactory;
52+
@Autowired private CouchbaseTemplate ops;
53+
@Autowired private ReactiveCouchbaseTemplate reactiveOps;
54+
55+
@BeforeEach
56+
public void beforeEachTest() {
57+
assertNotInTransaction();
58+
}
59+
60+
@AfterEach
61+
public void afterEachTest() {
62+
assertNotInTransaction();
63+
}
64+
65+
@DisplayName("ReactiveCouchbaseTemplate.save() called inside a reactive SDK transaction should work")
66+
@Test
67+
public void reactiveSaveInReactiveTransaction() {
68+
couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> {
69+
PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White");
70+
return reactiveOps.save(p);
71+
}).block();
72+
}
73+
74+
@DisplayName("ReactiveCouchbaseTemplate.save().block() called inside a non-reactive SDK transaction should work")
75+
@Test
76+
public void reactiveSaveInBlockingTransaction() {
77+
couchbaseClientFactory.getCluster().transactions().run(ctx -> {
78+
PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White");
79+
reactiveOps.save(p).block();
80+
});
81+
}
82+
83+
@DisplayName("ReactiveCouchbaseTemplate.save() called inside a reactive SDK transaction should work")
84+
@Test
85+
public void blockingSaveInReactiveTransaction() {
86+
couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> {
87+
PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White");
88+
ops.save(p);
89+
return Mono.empty();
90+
}).block();
91+
}
92+
93+
@DisplayName("ReactiveCouchbaseTemplate.save().block() called inside a non-reactive SDK transaction should work")
94+
@Test
95+
public void blockingSaveInBlockingTransaction() {
96+
couchbaseClientFactory.getCluster().transactions().run(ctx -> {
97+
PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White");
98+
ops.save(p);
99+
});
100+
}
101+
}

0 commit comments

Comments
 (0)