From 4862dcc39d5346d588bec9ad6317b4c94e825113 Mon Sep 17 00:00:00 2001 From: mikereiche Date: Thu, 1 Oct 2020 21:35:19 -0700 Subject: [PATCH] DATACOUCH-605 - Support ScanConsistency in n1ql queries --- .../core/ExecutableFindByQueryOperation.java | 2 +- ...ExecutableFindByQueryOperationSupport.java | 14 +++++++---- .../core/ReactiveFindByQueryOperation.java | 2 +- .../ReactiveFindByQueryOperationSupport.java | 10 ++++++-- .../ReactiveInsertByIdOperationSupport.java | 3 +-- .../ReactiveUpsertByIdOperationSupport.java | 3 +-- .../data/couchbase/core/query/Query.java | 24 ++++++++++++++++++- .../repository/CouchbaseRepository.java | 3 +++ .../query/CouchbaseQueryMethod.java | 19 +++++++++++++++ .../query/N1qlRepositoryQueryExecutor.java | 17 +++++++++---- .../ReactiveN1qlRepositoryQueryExecutor.java | 14 ++++++++++- .../support/SimpleCouchbaseRepository.java | 5 ++++ ...hbaseTemplateKeyValueIntegrationTests.java | 3 +++ .../couchbase/domain/AirportRepository.java | 6 ++++- .../domain/ReactiveAirportRepository.java | 5 ++++ ...chbaseRepositoryQueryIntegrationTests.java | 8 ++----- ...chbaseRepositoryQueryIntegrationTests.java | 7 +++--- 17 files changed, 117 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperation.java b/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperation.java index 2fc6e4280..5078d7019 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperation.java +++ b/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperation.java @@ -126,7 +126,7 @@ interface FindByQueryConsistentWith extends FindByQueryWithQuery { * * @param scanConsistency the custom scan consistency to use for this query. */ - FindByQueryWithQuery consistentWith(QueryScanConsistency scanConsistency); + FindByQueryConsistentWith consistentWith(QueryScanConsistency scanConsistency); } diff --git a/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperationSupport.java index 46859bd89..7f569501b 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperationSupport.java +++ b/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperationSupport.java @@ -21,6 +21,7 @@ import org.springframework.data.couchbase.core.query.Query; import com.couchbase.client.java.query.QueryScanConsistency; +import org.springframework.data.couchbase.core.ReactiveFindByQueryOperationSupport.ReactiveFindByQuerySupport; public class ExecutableFindByQueryOperationSupport implements ExecutableFindByQueryOperation { @@ -50,8 +51,7 @@ static class ExecutableFindByQuerySupport implements ExecutableFindByQuery this.template = template; this.domainType = domainType; this.query = query; - this.reactiveSupport = new ReactiveFindByQueryOperationSupport.ReactiveFindByQuerySupport( - template.reactive(), domainType, query, scanConsistency); + this.reactiveSupport = new ReactiveFindByQuerySupport(template.reactive(), domainType, query, scanConsistency); this.scanConsistency = scanConsistency; } @@ -72,11 +72,17 @@ public List all() { @Override public TerminatingFindByQuery matching(final Query query) { - return new ExecutableFindByQuerySupport<>(template, domainType, query, scanConsistency); + QueryScanConsistency scanCons; + if (query.getScanConsistency() != null) { + scanCons = query.getScanConsistency(); + } else { + scanCons = scanConsistency; + } + return new ExecutableFindByQuerySupport<>(template, domainType, query, scanCons); } @Override - public FindByQueryWithQuery consistentWith(final QueryScanConsistency scanConsistency) { + public FindByQueryConsistentWith consistentWith(final QueryScanConsistency scanConsistency) { return new ExecutableFindByQuerySupport<>(template, domainType, query, scanConsistency); } diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperation.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperation.java index 50e333a97..309431f9c 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperation.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperation.java @@ -100,7 +100,7 @@ interface FindByQueryConsistentWith extends FindByQueryWithQuery { * * @param scanConsistency the custom scan consistency to use for this query. */ - FindByQueryWithQuery consistentWith(QueryScanConsistency scanConsistency); + FindByQueryConsistentWith consistentWith(QueryScanConsistency scanConsistency); } diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperationSupport.java index 09f8d81ab..cf0017585 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperationSupport.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperationSupport.java @@ -60,11 +60,17 @@ static class ReactiveFindByQuerySupport implements ReactiveFindByQuery { @Override public TerminatingFindByQuery matching(Query query) { - return new ReactiveFindByQuerySupport<>(template, domainType, query, scanConsistency); + QueryScanConsistency scanCons; + if (query.getScanConsistency() != null) { + scanCons = query.getScanConsistency(); + } else { + scanCons = scanConsistency; + } + return new ReactiveFindByQuerySupport<>(template, domainType, query, scanCons); } @Override - public FindByQueryWithQuery consistentWith(QueryScanConsistency scanConsistency) { + public FindByQueryConsistentWith consistentWith(QueryScanConsistency scanConsistency) { return new ReactiveFindByQuerySupport<>(template, domainType, query, scanConsistency); } diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveInsertByIdOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveInsertByIdOperationSupport.java index c977511f4..c76acf371 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveInsertByIdOperationSupport.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveInsertByIdOperationSupport.java @@ -15,7 +15,6 @@ */ package org.springframework.data.couchbase.core; -import com.couchbase.client.java.kv.UpsertOptions; import org.springframework.data.couchbase.core.mapping.Document; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -98,7 +97,7 @@ private InsertOptions buildInsertOptions() { } else if (durabilityLevel != DurabilityLevel.NONE) { options.durability(durabilityLevel); } - if (expiry != null) { + if (expiry != null && ! expiry.isZero()) { options.expiry(expiry); } else if (domainType.isAnnotationPresent(Document.class)) { Document documentAnn = domainType.getAnnotation(Document.class); diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveUpsertByIdOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveUpsertByIdOperationSupport.java index bb07b2c77..9dee87766 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveUpsertByIdOperationSupport.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveUpsertByIdOperationSupport.java @@ -15,7 +15,6 @@ */ package org.springframework.data.couchbase.core; -import com.couchbase.client.java.kv.Expiry; import org.springframework.data.couchbase.core.mapping.Document; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -98,7 +97,7 @@ private UpsertOptions buildUpsertOptions() { } else if (durabilityLevel != DurabilityLevel.NONE) { options.durability(durabilityLevel); } - if (expiry != null) { + if (expiry != null && !expiry.isZero()) { options.expiry(expiry); } else if (domainType.isAnnotationPresent(Document.class)) { Document documentAnn = domainType.getAnnotation(Document.class); diff --git a/src/main/java/org/springframework/data/couchbase/core/query/Query.java b/src/main/java/org/springframework/data/couchbase/core/query/Query.java index 5b154ec1f..0cff59ddf 100644 --- a/src/main/java/org/springframework/data/couchbase/core/query/Query.java +++ b/src/main/java/org/springframework/data/couchbase/core/query/Query.java @@ -47,6 +47,7 @@ public class Query { private long skip; private int limit; private Sort sort = Sort.unsorted(); + private QueryScanConsistency queryScanConsistency; static private final Pattern WHERE_PATTERN = Pattern.compile("\\sWHERE\\s"); @@ -127,6 +128,27 @@ public Query with(final Pageable pageable) { return with(pageable.getSort()); } + /** + * queryScanConsistency + * + * @return queryScanConsistency + */ + public QueryScanConsistency getScanConsistency() { + return queryScanConsistency; + } + + + /** + * Sets the given scan consistency on the {@link Query} instance. + * + * @param queryScanConsistency + * @return this + */ + public Query scanConsistency(final QueryScanConsistency queryScanConsistency) { + this.queryScanConsistency = queryScanConsistency; + return this; + } + /** * Adds a {@link Sort} to the {@link Query} instance. * @@ -270,7 +292,7 @@ StringBasedN1qlQueryParser.N1qlSpelValues getN1qlSpelValues(ReactiveCouchbaseTem } /** - * build QueryOptions forom parameters and scanConsistency + * build QueryOptions from parameters and scanConsistency * * @param scanConsistency * @return QueryOptions diff --git a/src/main/java/org/springframework/data/couchbase/repository/CouchbaseRepository.java b/src/main/java/org/springframework/data/couchbase/repository/CouchbaseRepository.java index aab74e924..5544d7e58 100644 --- a/src/main/java/org/springframework/data/couchbase/repository/CouchbaseRepository.java +++ b/src/main/java/org/springframework/data/couchbase/repository/CouchbaseRepository.java @@ -18,6 +18,7 @@ import java.util.List; +import com.couchbase.client.java.query.QueryScanConsistency; import org.springframework.data.domain.Sort; import org.springframework.data.repository.NoRepositoryBean; import org.springframework.data.repository.PagingAndSortingRepository; @@ -34,6 +35,8 @@ public interface CouchbaseRepository extends PagingAndSortingRepository findAll(Sort sort); + List findAll(QueryScanConsistency queryScanConsistency); + @Override List findAll(); diff --git a/src/main/java/org/springframework/data/couchbase/repository/query/CouchbaseQueryMethod.java b/src/main/java/org/springframework/data/couchbase/repository/query/CouchbaseQueryMethod.java index 43223010f..d230572ae 100644 --- a/src/main/java/org/springframework/data/couchbase/repository/query/CouchbaseQueryMethod.java +++ b/src/main/java/org/springframework/data/couchbase/repository/query/CouchbaseQueryMethod.java @@ -25,6 +25,7 @@ import org.springframework.data.couchbase.core.query.View; import org.springframework.data.couchbase.core.query.WithConsistency; import org.springframework.data.couchbase.repository.Query; +import org.springframework.data.couchbase.repository.ScanConsistency; import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.projection.ProjectionFactory; import org.springframework.data.repository.core.RepositoryMetadata; @@ -152,6 +153,24 @@ public WithConsistency getConsistencyAnnotation() { return method.getAnnotation(WithConsistency.class); } + /** + * If the method has a @ScanConsistency annotation + * + * @return true if this has the @ScanConsistency annotation + */ + public boolean hasScanConsistencyAnnotation() { + return getScanConsistencyAnnotation() != null; + } + + /** + * ScanConsistency annotation + * + * @return the @ScanConsistency annotation + */ + public ScanConsistency getScanConsistencyAnnotation() { + return method.getAnnotation(ScanConsistency.class); + } + /** * Returns the query string declared in a {@link Query} annotation or {@literal null} if neither the annotation found * nor the attribute was specified. diff --git a/src/main/java/org/springframework/data/couchbase/repository/query/N1qlRepositoryQueryExecutor.java b/src/main/java/org/springframework/data/couchbase/repository/query/N1qlRepositoryQueryExecutor.java index d9d3eed39..9f19847cd 100644 --- a/src/main/java/org/springframework/data/couchbase/repository/query/N1qlRepositoryQueryExecutor.java +++ b/src/main/java/org/springframework/data/couchbase/repository/query/N1qlRepositoryQueryExecutor.java @@ -15,9 +15,7 @@ */ package org.springframework.data.couchbase.repository.query; -import java.util.ArrayList; -import java.util.List; - +import com.couchbase.client.java.query.QueryScanConsistency; import org.springframework.data.couchbase.core.CouchbaseOperations; import org.springframework.data.couchbase.core.ExecutableFindByQueryOperation; import org.springframework.data.couchbase.core.query.Query; @@ -65,7 +63,8 @@ public Object execute(final Object[] parameters) { final PartTree tree = new PartTree(queryMethod.getName(), domainClass); query = new N1qlQueryCreator(tree, accessor, queryMethod, operations.getConverter()).createQuery(); } - q = (ExecutableFindByQueryOperation.ExecutableFindByQuery) operations.findByQuery(domainClass).matching(query); + q = (ExecutableFindByQueryOperation.ExecutableFindByQuery) operations.findByQuery(domainClass) + .consistentWith(buildQueryScanConsistency()).matching(query); if (queryMethod.isCountQuery()) { return q.count(); } else if (queryMethod.isCollectionQuery()) { @@ -76,4 +75,14 @@ public Object execute(final Object[] parameters) { } + private QueryScanConsistency buildQueryScanConsistency() { + QueryScanConsistency scanConsistency = QueryScanConsistency.NOT_BOUNDED; + if (queryMethod.hasConsistencyAnnotation()) { + scanConsistency = queryMethod.getConsistencyAnnotation().value(); + } else if (queryMethod.hasScanConsistencyAnnotation()) { + scanConsistency = queryMethod.getScanConsistencyAnnotation().query(); + } + return scanConsistency; + } + } diff --git a/src/main/java/org/springframework/data/couchbase/repository/query/ReactiveN1qlRepositoryQueryExecutor.java b/src/main/java/org/springframework/data/couchbase/repository/query/ReactiveN1qlRepositoryQueryExecutor.java index 2a5409806..bcc11eeb1 100644 --- a/src/main/java/org/springframework/data/couchbase/repository/query/ReactiveN1qlRepositoryQueryExecutor.java +++ b/src/main/java/org/springframework/data/couchbase/repository/query/ReactiveN1qlRepositoryQueryExecutor.java @@ -15,6 +15,7 @@ */ package org.springframework.data.couchbase.repository.query; +import com.couchbase.client.java.query.QueryScanConsistency; import org.springframework.data.couchbase.core.ExecutableFindByQueryOperation; import org.springframework.data.couchbase.core.ReactiveFindByQueryOperation; import org.springframework.data.mapping.context.MappingContext; @@ -69,7 +70,8 @@ public Object execute(final Object[] parameters) { final PartTree tree = new PartTree(queryMethod.getName(), domainClass); query = new N1qlQueryCreator(tree, accessor, queryMethod, operations.getConverter()).createQuery(); } - q = (ReactiveFindByQueryOperation.ReactiveFindByQuery) operations.findByQuery(domainClass).matching(query); + q = (ReactiveFindByQueryOperation.ReactiveFindByQuery) operations.findByQuery(domainClass) + .consistentWith(buildQueryScanConsistency()).matching(query); if (queryMethod.isCountQuery()) { return q.count(); } else if (queryMethod.isCollectionQuery()) { @@ -79,4 +81,14 @@ public Object execute(final Object[] parameters) { } } + private QueryScanConsistency buildQueryScanConsistency() { + QueryScanConsistency scanConsistency = QueryScanConsistency.NOT_BOUNDED; + if (queryMethod.hasConsistencyAnnotation()) { + scanConsistency = queryMethod.getConsistencyAnnotation().value(); + } else if (queryMethod.hasScanConsistencyAnnotation()) { + scanConsistency = queryMethod.getScanConsistencyAnnotation().query(); + } + return scanConsistency; + } + } diff --git a/src/main/java/org/springframework/data/couchbase/repository/support/SimpleCouchbaseRepository.java b/src/main/java/org/springframework/data/couchbase/repository/support/SimpleCouchbaseRepository.java index f6b7ee7ca..94e336a50 100644 --- a/src/main/java/org/springframework/data/couchbase/repository/support/SimpleCouchbaseRepository.java +++ b/src/main/java/org/springframework/data/couchbase/repository/support/SimpleCouchbaseRepository.java @@ -147,6 +147,11 @@ public List findAll(final Sort sort) { return findAll(new Query().with(sort)); } + @Override + public List findAll(final QueryScanConsistency queryScanConsistency) { + return findAll(new Query().scanConsistency(queryScanConsistency)); + } + @Override public Page findAll(final Pageable pageable) { List results = findAll(new Query().with(pageable)); diff --git a/src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java index 1ddf61037..9f0aa7825 100644 --- a/src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java @@ -55,6 +55,7 @@ class CouchbaseTemplateKeyValueIntegrationTests extends ClusterAwareIntegrationT private static CouchbaseClientFactory couchbaseClientFactory; private CouchbaseTemplate couchbaseTemplate; + private ReactiveCouchbaseTemplate reactiveCouchbaseTemplate; @BeforeAll static void beforeAll() { @@ -71,6 +72,7 @@ static void afterAll() throws IOException { void beforeEach() { ApplicationContext ac = new AnnotationConfigApplicationContext(Config.class); couchbaseTemplate = (CouchbaseTemplate) ac.getBean(COUCHBASE_TEMPLATE); + reactiveCouchbaseTemplate = (ReactiveCouchbaseTemplate) ac.getBean(REACTIVE_COUCHBASE_TEMPLATE); } @Test @@ -89,6 +91,7 @@ void upsertAndFindById() { assertEquals(user, found); couchbaseTemplate.removeById().one(user.getId()); + reactiveCouchbaseTemplate.replaceById(User.class).withDurability(PersistTo.ACTIVE, ReplicateTo.THREE).one(user); } @Test diff --git a/src/test/java/org/springframework/data/couchbase/domain/AirportRepository.java b/src/test/java/org/springframework/data/couchbase/domain/AirportRepository.java index 06ce1fec1..a441f0095 100644 --- a/src/test/java/org/springframework/data/couchbase/domain/AirportRepository.java +++ b/src/test/java/org/springframework/data/couchbase/domain/AirportRepository.java @@ -40,15 +40,19 @@ public interface AirportRepository extends PagingAndSortingRepository findAll(); @Override - @ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS) Airport save(Airport airport); @ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS) List findAllByIata(String iata); + @ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS) + Airport findByIata(String iata); + @Query("#{#n1ql.selectEntity} where iata = $1") + @ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS) List getAllByIata(String iata); + @ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS) long countByIataIn(String... iata); @ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS) diff --git a/src/test/java/org/springframework/data/couchbase/domain/ReactiveAirportRepository.java b/src/test/java/org/springframework/data/couchbase/domain/ReactiveAirportRepository.java index 54f56573e..5d707cf1d 100644 --- a/src/test/java/org/springframework/data/couchbase/domain/ReactiveAirportRepository.java +++ b/src/test/java/org/springframework/data/couchbase/domain/ReactiveAirportRepository.java @@ -16,6 +16,7 @@ package org.springframework.data.couchbase.domain; +import org.springframework.data.couchbase.repository.Query; import reactor.core.publisher.Flux; import org.springframework.data.couchbase.repository.ScanConsistency; @@ -44,6 +45,10 @@ public interface ReactiveAirportRepository extends ReactiveSortingRepository findAllByIata(String iata); + @Query("#{#n1ql.selectEntity} where iata = $1") + @ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS) + Flux getAllByIata(String iata); + @ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS) Mono countByIataIn(String... iatas); diff --git a/src/test/java/org/springframework/data/couchbase/repository/CouchbaseRepositoryQueryIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/repository/CouchbaseRepositoryQueryIntegrationTests.java index c4ea6cd88..482187065 100644 --- a/src/test/java/org/springframework/data/couchbase/repository/CouchbaseRepositoryQueryIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/repository/CouchbaseRepositoryQueryIntegrationTests.java @@ -25,8 +25,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -93,7 +91,6 @@ void findByInjection() { airportRepository.save(vie); xxx = new Airport("airports::xxx", "xxx", "xxxx"); airportRepository.save(xxx); - sleep(1000); List airports; airports = airportRepository.findAllByIata("1\" or iata=iata or iata=\"1"); assertEquals(0, airports.size()); @@ -112,7 +109,6 @@ void findBySimpleProperty() { try { vie = new Airport("airports::vie", "vie", "loww"); airportRepository.save(vie); - sleep(1000); List airports = airportRepository.findAllByIata("vie"); assertEquals(vie.getId(), airports.get(0).getId()); } finally { @@ -170,7 +166,7 @@ void threadSafeParametersTest() throws Exception { iatas[i].toLowerCase(Locale.ROOT) /* lcao */); airportRepository.save(airport); } - sleep(1000); + for (int k = 0; k < 50; k++) { Callable[] suppliers = new Callable[iatas.length]; for (int i = 0; i < iatas.length; i++) { @@ -211,7 +207,7 @@ void threadSafeStringParametersTest() throws Exception { Airport airport = new Airport("airports::" + iatas[i], iatas[i] /*iata*/, iatas[i].toLowerCase() /* lcao */); airportRepository.save(airport); } - sleep(1000); + for (int k = 0; k < 100; k++) { Callable[] suppliers = new Callable[iatas.length]; for (int i = 0; i < iatas.length; i++) { diff --git a/src/test/java/org/springframework/data/couchbase/repository/ReactiveCouchbaseRepositoryQueryIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/repository/ReactiveCouchbaseRepositoryQueryIntegrationTests.java index dd04aeb97..75dd831a7 100644 --- a/src/test/java/org/springframework/data/couchbase/repository/ReactiveCouchbaseRepositoryQueryIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/repository/ReactiveCouchbaseRepositoryQueryIntegrationTests.java @@ -88,9 +88,10 @@ void findBySimpleProperty() { try { vie = new Airport("airports::vie", "vie", "loww"); airportRepository.save(vie).block(); - List airports = airportRepository.findAllByIata("vie").collectList().block(); - // TODO - System.err.println(airports); + List airports1 = airportRepository.findAllByIata("vie").collectList().block(); + assertEquals(1,airports1.size()); + List airports2 = airportRepository.findAllByIata("vie").collectList().block(); + assertEquals(1,airports2.size()); } finally { airportRepository.delete(vie).block(); }