Skip to content

DATACOUCH-605 - Support ScanConsistency in n1ql queries #267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ interface FindByQueryConsistentWith<T> extends FindByQueryWithQuery<T> {
*
* @param scanConsistency the custom scan consistency to use for this query.
*/
FindByQueryWithQuery<T> consistentWith(QueryScanConsistency scanConsistency);
FindByQueryConsistentWith<T> consistentWith(QueryScanConsistency scanConsistency);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.data.couchbase.core.query.Query;

import com.couchbase.client.java.query.QueryScanConsistency;
import org.springframework.data.couchbase.core.ReactiveFindByQueryOperationSupport.ReactiveFindByQuerySupport;

public class ExecutableFindByQueryOperationSupport implements ExecutableFindByQueryOperation {

Expand Down Expand Up @@ -50,8 +51,7 @@ static class ExecutableFindByQuerySupport<T> implements ExecutableFindByQuery<T>
this.template = template;
this.domainType = domainType;
this.query = query;
this.reactiveSupport = new ReactiveFindByQueryOperationSupport.ReactiveFindByQuerySupport<T>(
template.reactive(), domainType, query, scanConsistency);
this.reactiveSupport = new ReactiveFindByQuerySupport<T>(template.reactive(), domainType, query, scanConsistency);
this.scanConsistency = scanConsistency;
}

Expand All @@ -72,11 +72,17 @@ public List<T> all() {

@Override
public TerminatingFindByQuery<T> matching(final Query query) {
return new ExecutableFindByQuerySupport<>(template, domainType, query, scanConsistency);
QueryScanConsistency scanCons;
if (query.getScanConsistency() != null) {
scanCons = query.getScanConsistency();
} else {
scanCons = scanConsistency;
}
return new ExecutableFindByQuerySupport<>(template, domainType, query, scanCons);
}

@Override
public FindByQueryWithQuery<T> consistentWith(final QueryScanConsistency scanConsistency) {
public FindByQueryConsistentWith<T> consistentWith(final QueryScanConsistency scanConsistency) {
return new ExecutableFindByQuerySupport<>(template, domainType, query, scanConsistency);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ interface FindByQueryConsistentWith<T> extends FindByQueryWithQuery<T> {
*
* @param scanConsistency the custom scan consistency to use for this query.
*/
FindByQueryWithQuery<T> consistentWith(QueryScanConsistency scanConsistency);
FindByQueryConsistentWith<T> consistentWith(QueryScanConsistency scanConsistency);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,17 @@ static class ReactiveFindByQuerySupport<T> implements ReactiveFindByQuery<T> {

@Override
public TerminatingFindByQuery<T> matching(Query query) {
return new ReactiveFindByQuerySupport<>(template, domainType, query, scanConsistency);
QueryScanConsistency scanCons;
if (query.getScanConsistency() != null) {
scanCons = query.getScanConsistency();
} else {
scanCons = scanConsistency;
}
return new ReactiveFindByQuerySupport<>(template, domainType, query, scanCons);
}

@Override
public FindByQueryWithQuery<T> consistentWith(QueryScanConsistency scanConsistency) {
public FindByQueryConsistentWith<T> consistentWith(QueryScanConsistency scanConsistency) {
return new ReactiveFindByQuerySupport<>(template, domainType, query, scanConsistency);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.springframework.data.couchbase.core;

import com.couchbase.client.java.kv.UpsertOptions;
import org.springframework.data.couchbase.core.mapping.Document;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -98,7 +97,7 @@ private InsertOptions buildInsertOptions() {
} else if (durabilityLevel != DurabilityLevel.NONE) {
options.durability(durabilityLevel);
}
if (expiry != null) {
if (expiry != null && ! expiry.isZero()) {
options.expiry(expiry);
} else if (domainType.isAnnotationPresent(Document.class)) {
Document documentAnn = domainType.getAnnotation(Document.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.springframework.data.couchbase.core;

import com.couchbase.client.java.kv.Expiry;
import org.springframework.data.couchbase.core.mapping.Document;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -98,7 +97,7 @@ private UpsertOptions buildUpsertOptions() {
} else if (durabilityLevel != DurabilityLevel.NONE) {
options.durability(durabilityLevel);
}
if (expiry != null) {
if (expiry != null && !expiry.isZero()) {
options.expiry(expiry);
} else if (domainType.isAnnotationPresent(Document.class)) {
Document documentAnn = domainType.getAnnotation(Document.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class Query {
private long skip;
private int limit;
private Sort sort = Sort.unsorted();
private QueryScanConsistency queryScanConsistency;

static private final Pattern WHERE_PATTERN = Pattern.compile("\\sWHERE\\s");

Expand Down Expand Up @@ -127,6 +128,27 @@ public Query with(final Pageable pageable) {
return with(pageable.getSort());
}

/**
* queryScanConsistency
*
* @return queryScanConsistency
*/
public QueryScanConsistency getScanConsistency() {
return queryScanConsistency;
}


/**
* Sets the given scan consistency on the {@link Query} instance.
*
* @param queryScanConsistency
* @return this
*/
public Query scanConsistency(final QueryScanConsistency queryScanConsistency) {
this.queryScanConsistency = queryScanConsistency;
return this;
}

/**
* Adds a {@link Sort} to the {@link Query} instance.
*
Expand Down Expand Up @@ -270,7 +292,7 @@ StringBasedN1qlQueryParser.N1qlSpelValues getN1qlSpelValues(ReactiveCouchbaseTem
}

/**
* build QueryOptions forom parameters and scanConsistency
* build QueryOptions from parameters and scanConsistency
*
* @param scanConsistency
* @return QueryOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;

import com.couchbase.client.java.query.QueryScanConsistency;
import org.springframework.data.domain.Sort;
import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.data.repository.PagingAndSortingRepository;
Expand All @@ -34,6 +35,8 @@ public interface CouchbaseRepository<T, ID> extends PagingAndSortingRepository<T
@Override
List<T> findAll(Sort sort);

List<T> findAll(QueryScanConsistency queryScanConsistency);

@Override
List<T> findAll();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.data.couchbase.core.query.View;
import org.springframework.data.couchbase.core.query.WithConsistency;
import org.springframework.data.couchbase.repository.Query;
import org.springframework.data.couchbase.repository.ScanConsistency;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.projection.ProjectionFactory;
import org.springframework.data.repository.core.RepositoryMetadata;
Expand Down Expand Up @@ -152,6 +153,24 @@ public WithConsistency getConsistencyAnnotation() {
return method.getAnnotation(WithConsistency.class);
}

/**
* If the method has a @ScanConsistency annotation
*
* @return true if this has the @ScanConsistency annotation
*/
public boolean hasScanConsistencyAnnotation() {
return getScanConsistencyAnnotation() != null;
}

/**
* ScanConsistency annotation
*
* @return the @ScanConsistency annotation
*/
public ScanConsistency getScanConsistencyAnnotation() {
return method.getAnnotation(ScanConsistency.class);
}

/**
* Returns the query string declared in a {@link Query} annotation or {@literal null} if neither the annotation found
* nor the attribute was specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
package org.springframework.data.couchbase.repository.query;

import java.util.ArrayList;
import java.util.List;

import com.couchbase.client.java.query.QueryScanConsistency;
import org.springframework.data.couchbase.core.CouchbaseOperations;
import org.springframework.data.couchbase.core.ExecutableFindByQueryOperation;
import org.springframework.data.couchbase.core.query.Query;
Expand Down Expand Up @@ -65,7 +63,8 @@ public Object execute(final Object[] parameters) {
final PartTree tree = new PartTree(queryMethod.getName(), domainClass);
query = new N1qlQueryCreator(tree, accessor, queryMethod, operations.getConverter()).createQuery();
}
q = (ExecutableFindByQueryOperation.ExecutableFindByQuery) operations.findByQuery(domainClass).matching(query);
q = (ExecutableFindByQueryOperation.ExecutableFindByQuery) operations.findByQuery(domainClass)
.consistentWith(buildQueryScanConsistency()).matching(query);
if (queryMethod.isCountQuery()) {
return q.count();
} else if (queryMethod.isCollectionQuery()) {
Expand All @@ -76,4 +75,14 @@ public Object execute(final Object[] parameters) {

}

private QueryScanConsistency buildQueryScanConsistency() {
QueryScanConsistency scanConsistency = QueryScanConsistency.NOT_BOUNDED;
if (queryMethod.hasConsistencyAnnotation()) {
scanConsistency = queryMethod.getConsistencyAnnotation().value();
} else if (queryMethod.hasScanConsistencyAnnotation()) {
scanConsistency = queryMethod.getScanConsistencyAnnotation().query();
}
return scanConsistency;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.data.couchbase.repository.query;

import com.couchbase.client.java.query.QueryScanConsistency;
import org.springframework.data.couchbase.core.ExecutableFindByQueryOperation;
import org.springframework.data.couchbase.core.ReactiveFindByQueryOperation;
import org.springframework.data.mapping.context.MappingContext;
Expand Down Expand Up @@ -69,7 +70,8 @@ public Object execute(final Object[] parameters) {
final PartTree tree = new PartTree(queryMethod.getName(), domainClass);
query = new N1qlQueryCreator(tree, accessor, queryMethod, operations.getConverter()).createQuery();
}
q = (ReactiveFindByQueryOperation.ReactiveFindByQuery) operations.findByQuery(domainClass).matching(query);
q = (ReactiveFindByQueryOperation.ReactiveFindByQuery) operations.findByQuery(domainClass)
.consistentWith(buildQueryScanConsistency()).matching(query);
if (queryMethod.isCountQuery()) {
return q.count();
} else if (queryMethod.isCollectionQuery()) {
Expand All @@ -79,4 +81,14 @@ public Object execute(final Object[] parameters) {
}
}

private QueryScanConsistency buildQueryScanConsistency() {
QueryScanConsistency scanConsistency = QueryScanConsistency.NOT_BOUNDED;
if (queryMethod.hasConsistencyAnnotation()) {
scanConsistency = queryMethod.getConsistencyAnnotation().value();
} else if (queryMethod.hasScanConsistencyAnnotation()) {
scanConsistency = queryMethod.getScanConsistencyAnnotation().query();
}
return scanConsistency;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public List<T> findAll(final Sort sort) {
return findAll(new Query().with(sort));
}

@Override
public List<T> findAll(final QueryScanConsistency queryScanConsistency) {
return findAll(new Query().scanConsistency(queryScanConsistency));
}

@Override
public Page<T> findAll(final Pageable pageable) {
List<T> results = findAll(new Query().with(pageable));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class CouchbaseTemplateKeyValueIntegrationTests extends ClusterAwareIntegrationT

private static CouchbaseClientFactory couchbaseClientFactory;
private CouchbaseTemplate couchbaseTemplate;
private ReactiveCouchbaseTemplate reactiveCouchbaseTemplate;

@BeforeAll
static void beforeAll() {
Expand All @@ -71,6 +72,7 @@ static void afterAll() throws IOException {
void beforeEach() {
ApplicationContext ac = new AnnotationConfigApplicationContext(Config.class);
couchbaseTemplate = (CouchbaseTemplate) ac.getBean(COUCHBASE_TEMPLATE);
reactiveCouchbaseTemplate = (ReactiveCouchbaseTemplate) ac.getBean(REACTIVE_COUCHBASE_TEMPLATE);
}

@Test
Expand All @@ -89,6 +91,7 @@ void upsertAndFindById() {
assertEquals(user, found);

couchbaseTemplate.removeById().one(user.getId());
reactiveCouchbaseTemplate.replaceById(User.class).withDurability(PersistTo.ACTIVE, ReplicateTo.THREE).one(user);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,19 @@ public interface AirportRepository extends PagingAndSortingRepository<Airport, S
Iterable<Airport> findAll();

@Override
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
Airport save(Airport airport);

@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
List<Airport> findAllByIata(String iata);

@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
Airport findByIata(String iata);

@Query("#{#n1ql.selectEntity} where iata = $1")
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
List<Airport> getAllByIata(String iata);

@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
long countByIataIn(String... iata);

@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.data.couchbase.domain;

import org.springframework.data.couchbase.repository.Query;
import reactor.core.publisher.Flux;

import org.springframework.data.couchbase.repository.ScanConsistency;
Expand Down Expand Up @@ -44,6 +45,10 @@ public interface ReactiveAirportRepository extends ReactiveSortingRepository<Air
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
Flux<Airport> findAllByIata(String iata);

@Query("#{#n1ql.selectEntity} where iata = $1")
@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
Flux<Airport> getAllByIata(String iata);

@ScanConsistency(query = QueryScanConsistency.REQUEST_PLUS)
Mono<Long> countByIataIn(String... iatas);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -93,7 +91,6 @@ void findByInjection() {
airportRepository.save(vie);
xxx = new Airport("airports::xxx", "xxx", "xxxx");
airportRepository.save(xxx);
sleep(1000);
List<Airport> airports;
airports = airportRepository.findAllByIata("1\" or iata=iata or iata=\"1");
assertEquals(0, airports.size());
Expand All @@ -112,7 +109,6 @@ void findBySimpleProperty() {
try {
vie = new Airport("airports::vie", "vie", "loww");
airportRepository.save(vie);
sleep(1000);
List<Airport> airports = airportRepository.findAllByIata("vie");
assertEquals(vie.getId(), airports.get(0).getId());
} finally {
Expand Down Expand Up @@ -170,7 +166,7 @@ void threadSafeParametersTest() throws Exception {
iatas[i].toLowerCase(Locale.ROOT) /* lcao */);
airportRepository.save(airport);
}
sleep(1000);

for (int k = 0; k < 50; k++) {
Callable<Boolean>[] suppliers = new Callable[iatas.length];
for (int i = 0; i < iatas.length; i++) {
Expand Down Expand Up @@ -211,7 +207,7 @@ void threadSafeStringParametersTest() throws Exception {
Airport airport = new Airport("airports::" + iatas[i], iatas[i] /*iata*/, iatas[i].toLowerCase() /* lcao */);
airportRepository.save(airport);
}
sleep(1000);

for (int k = 0; k < 100; k++) {
Callable<Boolean>[] suppliers = new Callable[iatas.length];
for (int i = 0; i < iatas.length; i++) {
Expand Down
Loading