Skip to content

Commit b32746c

Browse files
committed
DATACOUCH-605 - Support ScanConsistency in n1ql queries
1 parent 946e835 commit b32746c

13 files changed

+91
-22
lines changed

src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ interface FindByQueryConsistentWith<T> extends FindByQueryWithQuery<T> {
126126
*
127127
* @param scanConsistency the custom scan consistency to use for this query.
128128
*/
129-
FindByQueryWithQuery<T> consistentWith(QueryScanConsistency scanConsistency);
129+
FindByQueryConsistentWith<T> consistentWith(QueryScanConsistency scanConsistency);
130130

131131
}
132132

src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperationSupport.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.springframework.data.couchbase.core.query.Query;
2222

2323
import com.couchbase.client.java.query.QueryScanConsistency;
24+
import org.springframework.data.couchbase.core.ReactiveFindByQueryOperationSupport.ReactiveFindByQuerySupport;
2425

2526
public class ExecutableFindByQueryOperationSupport implements ExecutableFindByQueryOperation {
2627

@@ -50,8 +51,7 @@ static class ExecutableFindByQuerySupport<T> implements ExecutableFindByQuery<T>
5051
this.template = template;
5152
this.domainType = domainType;
5253
this.query = query;
53-
this.reactiveSupport = new ReactiveFindByQueryOperationSupport.ReactiveFindByQuerySupport<T>(
54-
template.reactive(), domainType, query, scanConsistency);
54+
this.reactiveSupport = new ReactiveFindByQuerySupport<T>(template.reactive(), domainType, query, scanConsistency);
5555
this.scanConsistency = scanConsistency;
5656
}
5757

@@ -72,11 +72,17 @@ public List<T> all() {
7272

7373
@Override
7474
public TerminatingFindByQuery<T> matching(final Query query) {
75-
return new ExecutableFindByQuerySupport<>(template, domainType, query, scanConsistency);
75+
QueryScanConsistency scanCons;
76+
if (query.getConsistency() != null) {
77+
scanCons = query.getConsistency();
78+
} else {
79+
scanCons = scanConsistency;
80+
}
81+
return new ExecutableFindByQuerySupport<>(template, domainType, query, scanCons);
7682
}
7783

7884
@Override
79-
public FindByQueryWithQuery<T> consistentWith(final QueryScanConsistency scanConsistency) {
85+
public FindByQueryConsistentWith<T> consistentWith(final QueryScanConsistency scanConsistency) {
8086
return new ExecutableFindByQuerySupport<>(template, domainType, query, scanConsistency);
8187
}
8288

src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperationSupport.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,13 @@ static class ReactiveFindByQuerySupport<T> implements ReactiveFindByQuery<T> {
6060

6161
@Override
6262
public TerminatingFindByQuery<T> matching(Query query) {
63-
return new ReactiveFindByQuerySupport<>(template, domainType, query, scanConsistency);
63+
QueryScanConsistency scanCons;
64+
if (query.getConsistency() != null) {
65+
scanCons = query.getConsistency();
66+
} else {
67+
scanCons = scanConsistency;
68+
}
69+
return new ReactiveFindByQuerySupport<>(template, domainType, query, scanCons);
6470
}
6571

6672
@Override

src/main/java/org/springframework/data/couchbase/core/ReactiveInsertByIdOperationSupport.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package org.springframework.data.couchbase.core;
1717

18-
import com.couchbase.client.java.kv.UpsertOptions;
1918
import org.springframework.data.couchbase.core.mapping.Document;
2019
import reactor.core.publisher.Flux;
2120
import reactor.core.publisher.Mono;
@@ -98,7 +97,7 @@ private InsertOptions buildInsertOptions() {
9897
} else if (durabilityLevel != DurabilityLevel.NONE) {
9998
options.durability(durabilityLevel);
10099
}
101-
if (expiry != null) {
100+
if (expiry != null && ! expiry.isZero()) {
102101
options.expiry(expiry);
103102
} else if (domainType.isAnnotationPresent(Document.class)) {
104103
Document documentAnn = domainType.getAnnotation(Document.class);

src/main/java/org/springframework/data/couchbase/core/ReactiveUpsertByIdOperationSupport.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package org.springframework.data.couchbase.core;
1717

18-
import com.couchbase.client.java.kv.Expiry;
1918
import org.springframework.data.couchbase.core.mapping.Document;
2019
import reactor.core.publisher.Flux;
2120
import reactor.core.publisher.Mono;
@@ -98,7 +97,7 @@ private UpsertOptions buildUpsertOptions() {
9897
} else if (durabilityLevel != DurabilityLevel.NONE) {
9998
options.durability(durabilityLevel);
10099
}
101-
if (expiry != null) {
100+
if (expiry != null && !expiry.isZero()) {
102101
options.expiry(expiry);
103102
} else if (domainType.isAnnotationPresent(Document.class)) {
104103
Document documentAnn = domainType.getAnnotation(Document.class);

src/main/java/org/springframework/data/couchbase/core/query/Query.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class Query {
4747
private long skip;
4848
private int limit;
4949
private Sort sort = Sort.unsorted();
50+
private QueryScanConsistency queryScanConsistency;
5051

5152
static private final Pattern WHERE_PATTERN = Pattern.compile("\\sWHERE\\s");
5253

@@ -127,6 +128,26 @@ public Query with(final Pageable pageable) {
127128
return with(pageable.getSort());
128129
}
129130

131+
/**
132+
* queryScanConsistency
133+
*
134+
* @return queryScanConsistency
135+
*/
136+
public QueryScanConsistency getConsistency() {
137+
return queryScanConsistency;
138+
}
139+
140+
/**
141+
* Sets the given scan consistency on the {@link Query} instance.
142+
*
143+
* @param queryScanConsistency
144+
* @return this
145+
*/
146+
public Query with(final QueryScanConsistency queryScanConsistency) {
147+
this.queryScanConsistency = queryScanConsistency;
148+
return this;
149+
}
150+
130151
/**
131152
* Adds a {@link Sort} to the {@link Query} instance.
132153
*
@@ -270,7 +291,7 @@ StringBasedN1qlQueryParser.N1qlSpelValues getN1qlSpelValues(ReactiveCouchbaseTem
270291
}
271292

272293
/**
273-
* build QueryOptions forom parameters and scanConsistency
294+
* build QueryOptions from parameters and scanConsistency
274295
*
275296
* @param scanConsistency
276297
* @return QueryOptions

src/main/java/org/springframework/data/couchbase/repository/query/CouchbaseQueryMethod.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.data.couchbase.core.query.View;
2626
import org.springframework.data.couchbase.core.query.WithConsistency;
2727
import org.springframework.data.couchbase.repository.Query;
28+
import org.springframework.data.couchbase.repository.ScanConsistency;
2829
import org.springframework.data.mapping.context.MappingContext;
2930
import org.springframework.data.projection.ProjectionFactory;
3031
import org.springframework.data.repository.core.RepositoryMetadata;
@@ -152,6 +153,24 @@ public WithConsistency getConsistencyAnnotation() {
152153
return method.getAnnotation(WithConsistency.class);
153154
}
154155

156+
/**
157+
* If the method has a @ScanConsistency annotation
158+
*
159+
* @return true if this has the @ScanConsistency annotation
160+
*/
161+
public boolean hasScanConsistencyAnnotation() {
162+
return getScanConsistencyAnnotation() != null;
163+
}
164+
165+
/**
166+
* ScanConsistency annotation
167+
*
168+
* @return the @ScanConsistency annotation
169+
*/
170+
public ScanConsistency getScanConsistencyAnnotation() {
171+
return method.getAnnotation(ScanConsistency.class);
172+
}
173+
155174
/**
156175
* Returns the query string declared in a {@link Query} annotation or {@literal null} if neither the annotation found
157176
* nor the attribute was specified.

src/main/java/org/springframework/data/couchbase/repository/query/N1qlRepositoryQueryExecutor.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
*/
1616
package org.springframework.data.couchbase.repository.query;
1717

18-
import java.util.ArrayList;
19-
import java.util.List;
20-
18+
import com.couchbase.client.java.query.QueryScanConsistency;
2119
import org.springframework.data.couchbase.core.CouchbaseOperations;
2220
import org.springframework.data.couchbase.core.ExecutableFindByQueryOperation;
2321
import org.springframework.data.couchbase.core.query.Query;
@@ -65,7 +63,8 @@ public Object execute(final Object[] parameters) {
6563
final PartTree tree = new PartTree(queryMethod.getName(), domainClass);
6664
query = new N1qlQueryCreator(tree, accessor, queryMethod, operations.getConverter()).createQuery();
6765
}
68-
q = (ExecutableFindByQueryOperation.ExecutableFindByQuery) operations.findByQuery(domainClass).matching(query);
66+
q = (ExecutableFindByQueryOperation.ExecutableFindByQuery) operations.findByQuery(domainClass)
67+
.consistentWith(buildQueryScanConsistency()).matching(query);
6968
if (queryMethod.isCountQuery()) {
7069
return q.count();
7170
} else if (queryMethod.isCollectionQuery()) {
@@ -76,4 +75,14 @@ public Object execute(final Object[] parameters) {
7675

7776
}
7877

78+
private QueryScanConsistency buildQueryScanConsistency() {
79+
QueryScanConsistency scanConsistency = QueryScanConsistency.NOT_BOUNDED;
80+
if (queryMethod.hasConsistencyAnnotation()) {
81+
scanConsistency = queryMethod.getConsistencyAnnotation().value();
82+
} else if (queryMethod.hasScanConsistencyAnnotation()) {
83+
scanConsistency = queryMethod.getScanConsistencyAnnotation().query();
84+
}
85+
return scanConsistency;
86+
}
87+
7988
}

src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class CouchbaseTemplateKeyValueIntegrationTests extends ClusterAwareIntegrationT
5555

5656
private static CouchbaseClientFactory couchbaseClientFactory;
5757
private CouchbaseTemplate couchbaseTemplate;
58+
private ReactiveCouchbaseTemplate reactiveCouchbaseTemplate;
5859

5960
@BeforeAll
6061
static void beforeAll() {
@@ -71,6 +72,7 @@ static void afterAll() throws IOException {
7172
void beforeEach() {
7273
ApplicationContext ac = new AnnotationConfigApplicationContext(Config.class);
7374
couchbaseTemplate = (CouchbaseTemplate) ac.getBean(COUCHBASE_TEMPLATE);
75+
reactiveCouchbaseTemplate = (ReactiveCouchbaseTemplate) ac.getBean(REACTIVE_COUCHBASE_TEMPLATE);
7476
}
7577

7678
@Test
@@ -89,6 +91,7 @@ void upsertAndFindById() {
8991
assertEquals(user, found);
9092

9193
couchbaseTemplate.removeById().one(user.getId());
94+
reactiveCouchbaseTemplate.replaceById(User.class).withDurability(PersistTo.ACTIVE, ReplicateTo.THREE).one(user);
9295
}
9396

9497
@Test

src/test/java/org/springframework/data/couchbase/domain/AirportRepository.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,19 @@ public interface AirportRepository extends PagingAndSortingRepository<Airport, S
4040
Iterable<Airport> findAll();
4141

4242
@Override
43-
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
4443
Airport save(Airport airport);
4544

4645
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
4746
List<Airport> findAllByIata(String iata);
4847

48+
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
49+
Airport findByIata(String iata);
50+
4951
@Query("#{#n1ql.selectEntity} where iata = $1")
52+
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
5053
List<Airport> getAllByIata(String iata);
5154

55+
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
5256
long countByIataIn(String... iata);
5357

5458
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
@@ -60,5 +64,4 @@ public interface AirportRepository extends PagingAndSortingRepository<Airport, S
6064
@Override
6165
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
6266
long count();
63-
6467
}

src/test/java/org/springframework/data/couchbase/domain/ReactiveAirportRepository.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.data.couchbase.domain;
1818

19+
import org.springframework.data.couchbase.repository.Query;
1920
import reactor.core.publisher.Flux;
2021

2122
import org.springframework.data.couchbase.repository.ScanConsistency;
@@ -44,6 +45,10 @@ public interface ReactiveAirportRepository extends ReactiveSortingRepository<Air
4445
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
4546
Flux<Airport> findAllByIata(String iata);
4647

48+
@Query("#{#n1ql.selectEntity} where iata = $1")
49+
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
50+
Flux<Airport> getAllByIata(String iata);
51+
4752
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
4853
Mono<Long> countByIataIn(String... iatas);
4954

src/test/java/org/springframework/data/couchbase/repository/CouchbaseRepositoryQueryIntegrationTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import java.util.concurrent.ExecutorService;
2626
import java.util.concurrent.Executors;
2727
import java.util.concurrent.Future;
28-
import java.util.stream.Collectors;
29-
import java.util.stream.StreamSupport;
3028

3129
import org.junit.jupiter.api.BeforeEach;
3230
import org.junit.jupiter.api.Test;

src/test/java/org/springframework/data/couchbase/repository/ReactiveCouchbaseRepositoryQueryIntegrationTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,10 @@ void findBySimpleProperty() {
8888
try {
8989
vie = new Airport("airports::vie", "vie", "loww");
9090
airportRepository.save(vie).block();
91-
List<Airport> airports = airportRepository.findAllByIata("vie").collectList().block();
92-
// TODO
93-
System.err.println(airports);
91+
List<Airport> airports1 = airportRepository.findAllByIata("vie").collectList().block();
92+
assertEquals(1,airports1.size());
93+
List<Airport> airports2 = airportRepository.findAllByIata("vie").collectList().block();
94+
assertEquals(1,airports2.size());
9495
} finally {
9596
airportRepository.delete(vie).block();
9697
}

0 commit comments

Comments
 (0)