Skip to content

Commit 9f958a9

Browse files
committed
Fix ThreadLocal Issue with Repository Save. (#1840)
The issue was introduced when the Mono.deferContextual() was added to determine if the save() is in a transaction. It may be executing in a different thread when the PseudoArgs (scope, collection, and options) are retrieved ThreadLocal. This change ensures scope and collection are retrieved, but options are ignored and discarded. Closes #1838.
1 parent e64a489 commit 9f958a9

File tree

6 files changed

+75
-24
lines changed

6 files changed

+75
-24
lines changed

src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java

+1-9
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class ReactiveCouchbaseTemplate implements ReactiveCouchbaseOperations, A
5353
private final CouchbaseConverter converter;
5454
private final PersistenceExceptionTranslator exceptionTranslator;
5555
private final ReactiveCouchbaseTemplateSupport templateSupport;
56-
private ThreadLocal<PseudoArgs<?>> threadLocalArgs = new ThreadLocal<>();
56+
private final ThreadLocal<PseudoArgs<?>> threadLocalArgs = new ThreadLocal<>();
5757
private final QueryScanConsistency scanConsistency;
5858

5959
public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, final CouchbaseConverter converter) {
@@ -257,14 +257,6 @@ public PseudoArgs<?> getPseudoArgs() {
257257
* set the ThreadLocal field
258258
*/
259259
public void setPseudoArgs(PseudoArgs<?> threadLocalArgs) {
260-
if (this.threadLocalArgs == null) {
261-
synchronized (this) {
262-
if (this.threadLocalArgs == null) {
263-
this.threadLocalArgs = new ThreadLocal<>();
264-
}
265-
}
266-
}
267-
268260
this.threadLocalArgs.set(threadLocalArgs);
269261
}
270262

src/main/java/org/springframework/data/couchbase/repository/support/CouchbaseRepositoryBase.java

+17-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import java.lang.reflect.AnnotatedElement;
2020

21+
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
2122
import org.springframework.data.couchbase.core.query.OptionsBuilder;
23+
import org.springframework.data.couchbase.core.support.PseudoArgs;
2224
import org.springframework.data.couchbase.repository.Collection;
2325
import org.springframework.data.couchbase.repository.ScanConsistency;
2426
import org.springframework.data.couchbase.repository.Scope;
@@ -35,7 +37,7 @@
3537
*
3638
* @author Michael Reiche
3739
*/
38-
public class CouchbaseRepositoryBase<T, ID> {
40+
public abstract class CouchbaseRepositoryBase<T, ID> {
3941

4042
/**
4143
* Contains information about the entity being used in this repository.
@@ -82,9 +84,11 @@ <S extends T> String getId(S entity) {
8284

8385
protected String getScope() {
8486
String fromAnnotation = OptionsBuilder.annotationString(Scope.class, CollectionIdentifier.DEFAULT_SCOPE,
85-
new AnnotatedElement[] { getJavaType(), repositoryInterface });
87+
new AnnotatedElement[] { getJavaType(), getRepositoryInterface() });
8688
String fromMetadata = crudMethodMetadata != null ? crudMethodMetadata.getScope() : null;
87-
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_SCOPE, fromMetadata, fromAnnotation);
89+
PseudoArgs<?> pa = getReactiveTemplate().getPseudoArgs();
90+
String fromThreadLocal = pa != null ? pa.getScope() : null;
91+
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_SCOPE, fromThreadLocal, fromMetadata, fromAnnotation);
8892
}
8993

9094
/**
@@ -96,12 +100,18 @@ protected String getScope() {
96100
* 1. repository.withCollection()
97101
*/
98102
protected String getCollection() {
99-
String fromAnnotation = OptionsBuilder.annotationString(Collection.class, CollectionIdentifier.DEFAULT_COLLECTION,
100-
new AnnotatedElement[] { getJavaType(), repositoryInterface });
103+
String fromAnnotation = OptionsBuilder.annotationString(Collection.class,
104+
CollectionIdentifier.DEFAULT_COLLECTION,
105+
new AnnotatedElement[] { getJavaType(), getRepositoryInterface() });
101106
String fromMetadata = crudMethodMetadata != null ? crudMethodMetadata.getCollection() : null;
102-
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_COLLECTION, fromMetadata, fromAnnotation);
107+
PseudoArgs<?> pa = getReactiveTemplate().getPseudoArgs();
108+
String fromThreadLocal = pa != null ? pa.getCollection() : null;
109+
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_COLLECTION, fromThreadLocal, fromMetadata,
110+
fromAnnotation);
103111
}
104112

113+
protected abstract ReactiveCouchbaseTemplate getReactiveTemplate();
114+
105115
/**
106116
* Get the QueryScanConsistency from <br>
107117
* 1. The method annotation (method *could* be available from crudMethodMetadata)<br>
@@ -132,4 +142,5 @@ QueryScanConsistency getQueryScanConsistency() {
132142
void setRepositoryMethodMetadata(CrudMethodMetadata crudMethodMetadata) {
133143
this.crudMethodMetadata = crudMethodMetadata;
134144
}
145+
135146
}

src/main/java/org/springframework/data/couchbase/repository/support/SimpleCouchbaseRepository.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import java.util.stream.Collectors;
2424

2525
import org.springframework.data.couchbase.core.CouchbaseOperations;
26-
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentEntity;
27-
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentProperty;
26+
import org.springframework.data.couchbase.core.CouchbaseTemplate;
27+
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
2828
import org.springframework.data.couchbase.core.query.Query;
2929
import org.springframework.data.couchbase.repository.CouchbaseRepository;
3030
import org.springframework.data.couchbase.repository.query.CouchbaseEntityInformation;
@@ -37,7 +37,6 @@
3737
import org.springframework.util.Assert;
3838

3939
import com.couchbase.client.java.query.QueryScanConsistency;
40-
import org.springframework.util.ReflectionUtils;
4140

4241
/**
4342
* Repository base implementation for Couchbase.
@@ -71,7 +70,13 @@ public SimpleCouchbaseRepository(CouchbaseEntityInformation<T, String> entityInf
7170
@Override
7271
@SuppressWarnings("unchecked")
7372
public <S extends T> S save(S entity) {
74-
return operations.save(entity, getScope(), getCollection());
73+
String scopeName = getScope();
74+
String collectionName = getCollection();
75+
// clear out the PseudoArgs here as whatever is called by operations.save() could be in a different thread.
76+
// not that this will also clear out Options, but that's ok as any options would not work
77+
// with all of insert/upsert/replace. If Options are needed, use template.insertById/upsertById/replaceById
78+
getReactiveTemplate().setPseudoArgs(null);
79+
return operations.save(entity, scopeName, collectionName);
7580
}
7681

7782
@Override
@@ -177,4 +182,8 @@ public CouchbaseOperations getOperations() {
177182
return operations;
178183
}
179184

185+
@Override
186+
protected ReactiveCouchbaseTemplate getReactiveTemplate() {
187+
return ((CouchbaseTemplate) getOperations()).reactive();
188+
}
180189
}

src/main/java/org/springframework/data/couchbase/repository/support/SimpleReactiveCouchbaseRepository.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@
2626
import org.reactivestreams.Publisher;
2727
import org.springframework.data.couchbase.core.CouchbaseOperations;
2828
import org.springframework.data.couchbase.core.ReactiveCouchbaseOperations;
29-
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentEntity;
30-
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentProperty;
29+
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
3130
import org.springframework.data.couchbase.core.query.Query;
3231
import org.springframework.data.couchbase.repository.ReactiveCouchbaseRepository;
3332
import org.springframework.data.couchbase.repository.query.CouchbaseEntityInformation;
3433
import org.springframework.data.domain.Sort;
3534
import org.springframework.data.util.Streamable;
3635
import org.springframework.util.Assert;
37-
import org.springframework.util.ReflectionUtils;
3836

3937
/**
4038
* Reactive repository base implementation for Couchbase.
@@ -76,7 +74,13 @@ public Flux<T> findAll(Sort sort) {
7674
@SuppressWarnings("unchecked")
7775
@Override
7876
public <S extends T> Mono<S> save(S entity) {
79-
return save(entity, getScope(), getCollection());
77+
String scopeName = getScope();
78+
String collectionName = getCollection();
79+
// clear out the PseudoArgs here as whatever is called by operations.save() could be in a different thread.
80+
// not that this will also clear out Options, but that's ok as any options would not work
81+
// with all of insert/upsert/replace. If Options are needed, use template.insertById/upsertById/replaceById
82+
getReactiveTemplate().setPseudoArgs(null);
83+
return operations.save(entity, scopeName, collectionName);
8084
}
8185

8286
@Override
@@ -227,4 +231,9 @@ public ReactiveCouchbaseOperations getOperations() {
227231
return operations;
228232
}
229233

234+
@Override
235+
protected ReactiveCouchbaseTemplate getReactiveTemplate() {
236+
return (ReactiveCouchbaseTemplate) getOperations();
237+
}
238+
230239
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.springframework.data.couchbase.domain;
2+
3+
import org.springframework.data.couchbase.repository.Collection;
4+
import org.springframework.data.couchbase.repository.Scope;
5+
6+
@Scope("must set scope name")
7+
@Collection("my_collection")
8+
public interface ReactiveAirportMustScopeRepository extends ReactiveAirportRepository {
9+
}

src/test/java/org/springframework/data/couchbase/repository/query/ReactiveCouchbaseRepositoryQueryCollectionIntegrationTests.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
import static org.junit.jupiter.api.Assertions.assertEquals;
1919
import static org.junit.jupiter.api.Assertions.assertThrows;
2020

21+
import reactor.core.Disposable;
22+
2123
import java.util.List;
24+
import java.util.Random;
25+
import java.util.UUID;
2226

2327
import org.junit.jupiter.api.AfterAll;
2428
import org.junit.jupiter.api.AfterEach;
@@ -30,7 +34,8 @@
3034
import org.springframework.data.couchbase.core.CouchbaseTemplate;
3135
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
3236
import org.springframework.data.couchbase.domain.Airport;
33-
import org.springframework.data.couchbase.domain.ConfigScoped;;
37+
import org.springframework.data.couchbase.domain.ConfigScoped;
38+
import org.springframework.data.couchbase.domain.ReactiveAirportMustScopeRepository;
3439
import org.springframework.data.couchbase.domain.ReactiveAirportRepository;
3540
import org.springframework.data.couchbase.domain.ReactiveAirportRepositoryAnnotated;
3641
import org.springframework.data.couchbase.domain.ReactiveUserColRepository;
@@ -61,6 +66,7 @@ public class ReactiveCouchbaseRepositoryQueryCollectionIntegrationTests extends
6166

6267
@Autowired ReactiveAirportRepository reactiveAirportRepository;
6368
@Autowired ReactiveAirportRepositoryAnnotated reactiveAirportRepositoryAnnotated;
69+
@Autowired ReactiveAirportMustScopeRepository reactiveAirportMustScopeRepository;
6470
@Autowired ReactiveUserColRepository userColRepository;
6571
@Autowired public CouchbaseTemplate couchbaseTemplate;
6672
@Autowired public ReactiveCouchbaseTemplate reactiveCouchbaseTemplate;
@@ -116,6 +122,21 @@ public void myTest() {
116122

117123
}
118124

125+
@Test
126+
void testThreadLocal() throws InterruptedException {
127+
128+
String scopeName = "my_scope";
129+
String id = UUID.randomUUID().toString();
130+
131+
Airport airport = new Airport(id, "testThreadLocal", "icao");
132+
Disposable s = reactiveAirportMustScopeRepository.withScope(scopeName).findById(airport.getId()).doOnNext(u -> {
133+
throw new RuntimeException("User already Exists! " + u);
134+
}).then(reactiveAirportMustScopeRepository.withScope(scopeName).save(airport))
135+
.subscribe(u -> LOGGER.info("User Persisted Successfully! {}", u));
136+
137+
reactiveAirportMustScopeRepository.withScope(scopeName).deleteById(id).block();
138+
}
139+
119140
/**
120141
* can test against _default._default without setting up additional scope/collection and also test for collections and
121142
* scopes that do not exist These same tests should be repeated on non-default scope and collection in a test that

0 commit comments

Comments
 (0)