Skip to content

Commit 82edad5

Browse files
committed
DATACOUCH-605 - Support ScanConsistency in n1ql queries
1 parent fab72ee commit 82edad5

17 files changed

+117
-28
lines changed

src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperation.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ interface FindByQueryConsistentWith<T> extends FindByQueryWithQuery<T> {
126126
*
127127
* @param scanConsistency the custom scan consistency to use for this query.
128128
*/
129-
FindByQueryWithQuery<T> consistentWith(QueryScanConsistency scanConsistency);
129+
FindByQueryConsistentWith<T> consistentWith(QueryScanConsistency scanConsistency);
130130

131131
}
132132

src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperationSupport.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.springframework.data.couchbase.core.query.Query;
2222

2323
import com.couchbase.client.java.query.QueryScanConsistency;
24+
import org.springframework.data.couchbase.core.ReactiveFindByQueryOperationSupport.ReactiveFindByQuerySupport;
2425

2526
public class ExecutableFindByQueryOperationSupport implements ExecutableFindByQueryOperation {
2627

@@ -50,8 +51,7 @@ static class ExecutableFindByQuerySupport<T> implements ExecutableFindByQuery<T>
5051
this.template = template;
5152
this.domainType = domainType;
5253
this.query = query;
53-
this.reactiveSupport = new ReactiveFindByQueryOperationSupport.ReactiveFindByQuerySupport<T>(
54-
template.reactive(), domainType, query, scanConsistency);
54+
this.reactiveSupport = new ReactiveFindByQuerySupport<T>(template.reactive(), domainType, query, scanConsistency);
5555
this.scanConsistency = scanConsistency;
5656
}
5757

@@ -72,11 +72,17 @@ public List<T> all() {
7272

7373
@Override
7474
public TerminatingFindByQuery<T> matching(final Query query) {
75-
return new ExecutableFindByQuerySupport<>(template, domainType, query, scanConsistency);
75+
QueryScanConsistency scanCons;
76+
if (query.getScanConsistency() != null) {
77+
scanCons = query.getScanConsistency();
78+
} else {
79+
scanCons = scanConsistency;
80+
}
81+
return new ExecutableFindByQuerySupport<>(template, domainType, query, scanCons);
7682
}
7783

7884
@Override
79-
public FindByQueryWithQuery<T> consistentWith(final QueryScanConsistency scanConsistency) {
85+
public FindByQueryConsistentWith<T> consistentWith(final QueryScanConsistency scanConsistency) {
8086
return new ExecutableFindByQuerySupport<>(template, domainType, query, scanConsistency);
8187
}
8288

src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperation.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ interface FindByQueryConsistentWith<T> extends FindByQueryWithQuery<T> {
100100
*
101101
* @param scanConsistency the custom scan consistency to use for this query.
102102
*/
103-
FindByQueryWithQuery<T> consistentWith(QueryScanConsistency scanConsistency);
103+
FindByQueryConsistentWith<T> consistentWith(QueryScanConsistency scanConsistency);
104104

105105
}
106106

src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperationSupport.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,17 @@ static class ReactiveFindByQuerySupport<T> implements ReactiveFindByQuery<T> {
6060

6161
@Override
6262
public TerminatingFindByQuery<T> matching(Query query) {
63-
return new ReactiveFindByQuerySupport<>(template, domainType, query, scanConsistency);
63+
QueryScanConsistency scanCons;
64+
if (query.getScanConsistency() != null) {
65+
scanCons = query.getScanConsistency();
66+
} else {
67+
scanCons = scanConsistency;
68+
}
69+
return new ReactiveFindByQuerySupport<>(template, domainType, query, scanCons);
6470
}
6571

6672
@Override
67-
public FindByQueryWithQuery<T> consistentWith(QueryScanConsistency scanConsistency) {
73+
public FindByQueryConsistentWith<T> consistentWith(QueryScanConsistency scanConsistency) {
6874
return new ReactiveFindByQuerySupport<>(template, domainType, query, scanConsistency);
6975
}
7076

src/main/java/org/springframework/data/couchbase/core/ReactiveInsertByIdOperationSupport.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package org.springframework.data.couchbase.core;
1717

18-
import com.couchbase.client.java.kv.UpsertOptions;
1918
import org.springframework.data.couchbase.core.mapping.Document;
2019
import reactor.core.publisher.Flux;
2120
import reactor.core.publisher.Mono;
@@ -98,7 +97,7 @@ private InsertOptions buildInsertOptions() {
9897
} else if (durabilityLevel != DurabilityLevel.NONE) {
9998
options.durability(durabilityLevel);
10099
}
101-
if (expiry != null) {
100+
if (expiry != null && ! expiry.isZero()) {
102101
options.expiry(expiry);
103102
} else if (domainType.isAnnotationPresent(Document.class)) {
104103
Document documentAnn = domainType.getAnnotation(Document.class);

src/main/java/org/springframework/data/couchbase/core/ReactiveUpsertByIdOperationSupport.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package org.springframework.data.couchbase.core;
1717

18-
import com.couchbase.client.java.kv.Expiry;
1918
import org.springframework.data.couchbase.core.mapping.Document;
2019
import reactor.core.publisher.Flux;
2120
import reactor.core.publisher.Mono;
@@ -98,7 +97,7 @@ private UpsertOptions buildUpsertOptions() {
9897
} else if (durabilityLevel != DurabilityLevel.NONE) {
9998
options.durability(durabilityLevel);
10099
}
101-
if (expiry != null) {
100+
if (expiry != null && !expiry.isZero()) {
102101
options.expiry(expiry);
103102
} else if (domainType.isAnnotationPresent(Document.class)) {
104103
Document documentAnn = domainType.getAnnotation(Document.class);

src/main/java/org/springframework/data/couchbase/core/query/Query.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class Query {
4747
private long skip;
4848
private int limit;
4949
private Sort sort = Sort.unsorted();
50+
private QueryScanConsistency queryScanConsistency;
5051

5152
static private final Pattern WHERE_PATTERN = Pattern.compile("\\sWHERE\\s");
5253

@@ -127,6 +128,27 @@ public Query with(final Pageable pageable) {
127128
return with(pageable.getSort());
128129
}
129130

131+
/**
132+
* queryScanConsistency
133+
*
134+
* @return queryScanConsistency
135+
*/
136+
public QueryScanConsistency getScanConsistency() {
137+
return queryScanConsistency;
138+
}
139+
140+
141+
/**
142+
* Sets the given scan consistency on the {@link Query} instance.
143+
*
144+
* @param queryScanConsistency
145+
* @return this
146+
*/
147+
public Query scanConsistency(final QueryScanConsistency queryScanConsistency) {
148+
this.queryScanConsistency = queryScanConsistency;
149+
return this;
150+
}
151+
130152
/**
131153
* Adds a {@link Sort} to the {@link Query} instance.
132154
*
@@ -280,7 +302,7 @@ StringBasedN1qlQueryParser.N1qlSpelValues getN1qlSpelValues(ReactiveCouchbaseTem
280302
}
281303

282304
/**
283-
* build QueryOptions forom parameters and scanConsistency
305+
* build QueryOptions from parameters and scanConsistency
284306
*
285307
* @param scanConsistency
286308
* @return QueryOptions

src/main/java/org/springframework/data/couchbase/repository/CouchbaseRepository.java

+3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.List;
2020

21+
import com.couchbase.client.java.query.QueryScanConsistency;
2122
import org.springframework.data.domain.Sort;
2223
import org.springframework.data.repository.NoRepositoryBean;
2324
import org.springframework.data.repository.PagingAndSortingRepository;
@@ -34,6 +35,8 @@ public interface CouchbaseRepository<T, ID> extends PagingAndSortingRepository<T
3435
@Override
3536
List<T> findAll(Sort sort);
3637

38+
List<T> findAll(QueryScanConsistency queryScanConsistency);
39+
3740
@Override
3841
List<T> findAll();
3942

src/main/java/org/springframework/data/couchbase/repository/query/CouchbaseQueryMethod.java

+19
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.data.couchbase.core.query.View;
2626
import org.springframework.data.couchbase.core.query.WithConsistency;
2727
import org.springframework.data.couchbase.repository.Query;
28+
import org.springframework.data.couchbase.repository.ScanConsistency;
2829
import org.springframework.data.mapping.context.MappingContext;
2930
import org.springframework.data.projection.ProjectionFactory;
3031
import org.springframework.data.repository.core.RepositoryMetadata;
@@ -152,6 +153,24 @@ public WithConsistency getConsistencyAnnotation() {
152153
return method.getAnnotation(WithConsistency.class);
153154
}
154155

156+
/**
157+
* If the method has a @ScanConsistency annotation
158+
*
159+
* @return true if this has the @ScanConsistency annotation
160+
*/
161+
public boolean hasScanConsistencyAnnotation() {
162+
return getScanConsistencyAnnotation() != null;
163+
}
164+
165+
/**
166+
* ScanConsistency annotation
167+
*
168+
* @return the @ScanConsistency annotation
169+
*/
170+
public ScanConsistency getScanConsistencyAnnotation() {
171+
return method.getAnnotation(ScanConsistency.class);
172+
}
173+
155174
/**
156175
* Returns the query string declared in a {@link Query} annotation or {@literal null} if neither the annotation found
157176
* nor the attribute was specified.

src/main/java/org/springframework/data/couchbase/repository/query/N1qlRepositoryQueryExecutor.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
*/
1616
package org.springframework.data.couchbase.repository.query;
1717

18-
import java.util.ArrayList;
19-
import java.util.List;
20-
18+
import com.couchbase.client.java.query.QueryScanConsistency;
2119
import org.springframework.data.couchbase.core.CouchbaseOperations;
2220
import org.springframework.data.couchbase.core.ExecutableFindByQueryOperation;
2321
import org.springframework.data.couchbase.core.query.Query;
@@ -65,7 +63,8 @@ public Object execute(final Object[] parameters) {
6563
final PartTree tree = new PartTree(queryMethod.getName(), domainClass);
6664
query = new N1qlQueryCreator(tree, accessor, queryMethod, operations.getConverter()).createQuery();
6765
}
68-
q = (ExecutableFindByQueryOperation.ExecutableFindByQuery) operations.findByQuery(domainClass).matching(query);
66+
q = (ExecutableFindByQueryOperation.ExecutableFindByQuery) operations.findByQuery(domainClass)
67+
.consistentWith(buildQueryScanConsistency()).matching(query);
6968
if (queryMethod.isCountQuery()) {
7069
return q.count();
7170
} else if (queryMethod.isCollectionQuery()) {
@@ -76,4 +75,14 @@ public Object execute(final Object[] parameters) {
7675

7776
}
7877

78+
private QueryScanConsistency buildQueryScanConsistency() {
79+
QueryScanConsistency scanConsistency = QueryScanConsistency.NOT_BOUNDED;
80+
if (queryMethod.hasConsistencyAnnotation()) {
81+
scanConsistency = queryMethod.getConsistencyAnnotation().value();
82+
} else if (queryMethod.hasScanConsistencyAnnotation()) {
83+
scanConsistency = queryMethod.getScanConsistencyAnnotation().query();
84+
}
85+
return scanConsistency;
86+
}
87+
7988
}

src/main/java/org/springframework/data/couchbase/repository/query/ReactiveN1qlRepositoryQueryExecutor.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.couchbase.repository.query;
1717

18+
import com.couchbase.client.java.query.QueryScanConsistency;
1819
import org.springframework.data.couchbase.core.ExecutableFindByQueryOperation;
1920
import org.springframework.data.couchbase.core.ReactiveFindByQueryOperation;
2021
import org.springframework.data.mapping.context.MappingContext;
@@ -69,7 +70,8 @@ public Object execute(final Object[] parameters) {
6970
final PartTree tree = new PartTree(queryMethod.getName(), domainClass);
7071
query = new N1qlQueryCreator(tree, accessor, queryMethod, operations.getConverter()).createQuery();
7172
}
72-
q = (ReactiveFindByQueryOperation.ReactiveFindByQuery) operations.findByQuery(domainClass).matching(query);
73+
q = (ReactiveFindByQueryOperation.ReactiveFindByQuery) operations.findByQuery(domainClass)
74+
.consistentWith(buildQueryScanConsistency()).matching(query);
7375
if (queryMethod.isCountQuery()) {
7476
return q.count();
7577
} else if (queryMethod.isCollectionQuery()) {
@@ -79,4 +81,14 @@ public Object execute(final Object[] parameters) {
7981
}
8082
}
8183

84+
private QueryScanConsistency buildQueryScanConsistency() {
85+
QueryScanConsistency scanConsistency = QueryScanConsistency.NOT_BOUNDED;
86+
if (queryMethod.hasConsistencyAnnotation()) {
87+
scanConsistency = queryMethod.getConsistencyAnnotation().value();
88+
} else if (queryMethod.hasScanConsistencyAnnotation()) {
89+
scanConsistency = queryMethod.getScanConsistencyAnnotation().query();
90+
}
91+
return scanConsistency;
92+
}
93+
8294
}

src/main/java/org/springframework/data/couchbase/repository/support/SimpleCouchbaseRepository.java

+5
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ public List<T> findAll(final Sort sort) {
147147
return findAll(new Query().with(sort));
148148
}
149149

150+
@Override
151+
public List<T> findAll(final QueryScanConsistency queryScanConsistency) {
152+
return findAll(new Query().scanConsistency(queryScanConsistency));
153+
}
154+
150155
@Override
151156
public Page<T> findAll(final Pageable pageable) {
152157
List<T> results = findAll(new Query().with(pageable));

src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java

+3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class CouchbaseTemplateKeyValueIntegrationTests extends ClusterAwareIntegrationT
5555

5656
private static CouchbaseClientFactory couchbaseClientFactory;
5757
private CouchbaseTemplate couchbaseTemplate;
58+
private ReactiveCouchbaseTemplate reactiveCouchbaseTemplate;
5859

5960
@BeforeAll
6061
static void beforeAll() {
@@ -71,6 +72,7 @@ static void afterAll() throws IOException {
7172
void beforeEach() {
7273
ApplicationContext ac = new AnnotationConfigApplicationContext(Config.class);
7374
couchbaseTemplate = (CouchbaseTemplate) ac.getBean(COUCHBASE_TEMPLATE);
75+
reactiveCouchbaseTemplate = (ReactiveCouchbaseTemplate) ac.getBean(REACTIVE_COUCHBASE_TEMPLATE);
7476
}
7577

7678
@Test
@@ -89,6 +91,7 @@ void upsertAndFindById() {
8991
assertEquals(user, found);
9092

9193
couchbaseTemplate.removeById().one(user.getId());
94+
reactiveCouchbaseTemplate.replaceById(User.class).withDurability(PersistTo.ACTIVE, ReplicateTo.THREE).one(user);
9295
}
9396

9497
@Test

src/test/java/org/springframework/data/couchbase/domain/AirportRepository.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,19 @@ public interface AirportRepository extends PagingAndSortingRepository<Airport, S
4040
Iterable<Airport> findAll();
4141

4242
@Override
43-
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
4443
Airport save(Airport airport);
4544

4645
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
4746
List<Airport> findAllByIata(String iata);
4847

48+
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
49+
Airport findByIata(String iata);
50+
4951
@Query("#{#n1ql.selectEntity} where iata = $1")
52+
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
5053
List<Airport> getAllByIata(String iata);
5154

55+
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
5256
long countByIataIn(String... iata);
5357

5458
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)

src/test/java/org/springframework/data/couchbase/domain/ReactiveAirportRepository.java

+5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.data.couchbase.domain;
1818

19+
import org.springframework.data.couchbase.repository.Query;
1920
import reactor.core.publisher.Flux;
2021

2122
import org.springframework.data.couchbase.repository.ScanConsistency;
@@ -44,6 +45,10 @@ public interface ReactiveAirportRepository extends ReactiveSortingRepository<Air
4445
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
4546
Flux<Airport> findAllByIata(String iata);
4647

48+
@Query("#{#n1ql.selectEntity} where iata = $1")
49+
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
50+
Flux<Airport> getAllByIata(String iata);
51+
4752
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
4853
Mono<Long> countByIataIn(String... iatas);
4954

src/test/java/org/springframework/data/couchbase/repository/CouchbaseRepositoryQueryIntegrationTests.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import java.util.concurrent.ExecutorService;
2626
import java.util.concurrent.Executors;
2727
import java.util.concurrent.Future;
28-
import java.util.stream.Collectors;
29-
import java.util.stream.StreamSupport;
3028

3129
import org.junit.jupiter.api.BeforeEach;
3230
import org.junit.jupiter.api.Test;
@@ -93,7 +91,6 @@ void findByInjection() {
9391
airportRepository.save(vie);
9492
xxx = new Airport("airports::xxx", "xxx", "xxxx");
9593
airportRepository.save(xxx);
96-
sleep(1000);
9794
List<Airport> airports;
9895
airports = airportRepository.findAllByIata("1\" or iata=iata or iata=\"1");
9996
assertEquals(0, airports.size());
@@ -112,7 +109,6 @@ void findBySimpleProperty() {
112109
try {
113110
vie = new Airport("airports::vie", "vie", "loww");
114111
airportRepository.save(vie);
115-
sleep(1000);
116112
List<Airport> airports = airportRepository.findAllByIata("vie");
117113
assertEquals(vie.getId(), airports.get(0).getId());
118114
} finally {
@@ -170,7 +166,7 @@ void threadSafeParametersTest() throws Exception {
170166
iatas[i].toLowerCase(Locale.ROOT) /* lcao */);
171167
airportRepository.save(airport);
172168
}
173-
sleep(1000);
169+
174170
for (int k = 0; k < 50; k++) {
175171
Callable<Boolean>[] suppliers = new Callable[iatas.length];
176172
for (int i = 0; i < iatas.length; i++) {
@@ -211,7 +207,7 @@ void threadSafeStringParametersTest() throws Exception {
211207
Airport airport = new Airport("airports::" + iatas[i], iatas[i] /*iata*/, iatas[i].toLowerCase() /* lcao */);
212208
airportRepository.save(airport);
213209
}
214-
sleep(1000);
210+
215211
for (int k = 0; k < 100; k++) {
216212
Callable<Boolean>[] suppliers = new Callable[iatas.length];
217213
for (int i = 0; i < iatas.length; i++) {

0 commit comments

Comments
 (0)