Skip to content

Added support of the read locks #1655

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@

import com.couchbase.client.java.kv.GetOptions;
import org.springframework.data.couchbase.core.support.WithExpiry;
import org.springframework.data.couchbase.core.support.WithLock;

/**
* Get Operations
*
* @author Christoph Strobl
* @author Tigran Babloyan
* @since 2.0
*/
public interface ExecutableFindByIdOperation {
Expand Down Expand Up @@ -132,11 +134,25 @@ interface FindByIdWithExpiry<T> extends FindByIdWithProjection<T>, WithExpiry<T>
FindByIdWithProjection<T> withExpiry(Duration expiry);
}

interface FindByIdWithLock<T> extends FindByIdWithExpiry<T>, WithLock<T> {
/**
* Fetches document and write-locks it for the given duration.
* <p>
* Note that the client does not enforce an upper limit on the {@link Duration} lockTime. The maximum lock time
* by default on the server is 30 seconds. Any value larger than 30 seconds will be capped down by the server to
* the default lock time, which is 15 seconds unless modified on the server side.
*
* @param lockDuration how long to write-lock the document for (any duration > 30s will be capped to server default of 15s).
*/
@Override
FindByIdWithExpiry<T> withLock(Duration lockDuration);
}

/**
* Provides methods for constructing query operations in a fluent way.
*
* @param <T> the entity type to use for the results
*/
interface ExecutableFindById<T> extends FindByIdWithExpiry<T> {}
interface ExecutableFindById<T> extends FindByIdWithLock<T> {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ExecutableFindByIdOperationSupport implements ExecutableFindByIdOpe
@Override
public <T> ExecutableFindById<T> findById(Class<T> domainType) {
return new ExecutableFindByIdSupport<>(template, domainType, OptionsBuilder.getScopeFrom(domainType),
OptionsBuilder.getCollectionFrom(domainType),null, null, null);
OptionsBuilder.getCollectionFrom(domainType),null, null, null, null);
}

static class ExecutableFindByIdSupport<T> implements ExecutableFindById<T> {
Expand All @@ -49,19 +49,21 @@ static class ExecutableFindByIdSupport<T> implements ExecutableFindById<T> {
private final GetOptions options;
private final List<String> fields;
private final Duration expiry;
private final Duration lockDuration;
private final ReactiveFindByIdSupport<T> reactiveSupport;

ExecutableFindByIdSupport(CouchbaseTemplate template, Class<T> domainType, String scope, String collection,
GetOptions options, List<String> fields, Duration expiry) {
GetOptions options, List<String> fields, Duration expiry, Duration lockDuration) {
this.template = template;
this.domainType = domainType;
this.scope = scope;
this.collection = collection;
this.options = options;
this.fields = fields;
this.expiry = expiry;
this.lockDuration = lockDuration;
this.reactiveSupport = new ReactiveFindByIdSupport<>(template.reactive(), domainType, scope, collection, options,
fields, expiry, new NonReactiveSupportWrapper(template.support()));
fields, expiry, lockDuration, new NonReactiveSupportWrapper(template.support()));
}

@Override
Expand All @@ -77,31 +79,36 @@ public Collection<? extends T> all(final Collection<String> ids) {
@Override
public TerminatingFindById<T> withOptions(final GetOptions options) {
Assert.notNull(options, "Options must not be null.");
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry);
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry, lockDuration);
}

@Override
public FindByIdWithOptions<T> inCollection(final String collection) {
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection != null ? collection : this.collection, options, fields, expiry);
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection != null ? collection : this.collection, options, fields, expiry, lockDuration);
}

@Override
public FindByIdInCollection<T> inScope(final String scope) {
return new ExecutableFindByIdSupport<>(template, domainType, scope != null ? scope : this.scope, collection, options, fields, expiry);
return new ExecutableFindByIdSupport<>(template, domainType, scope != null ? scope : this.scope, collection, options, fields, expiry, lockDuration);
}

@Override
public FindByIdInScope<T> project(String... fields) {
Assert.notEmpty(fields, "Fields must not be null.");
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, Arrays.asList(fields), expiry);
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, Arrays.asList(fields), expiry, lockDuration);
}

@Override
public FindByIdWithProjection<T> withExpiry(final Duration expiry) {
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, fields,
expiry);
expiry, lockDuration);
}

@Override
public FindByIdWithExpiry<T> withLock(final Duration lockDuration) {
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, fields,
expiry, lockDuration);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.data.couchbase.core.support.InScope;
import org.springframework.data.couchbase.core.support.OneAndAllIdReactive;
import org.springframework.data.couchbase.core.support.WithExpiry;
import org.springframework.data.couchbase.core.support.WithLock;
import org.springframework.data.couchbase.core.support.WithGetOptions;
import org.springframework.data.couchbase.core.support.WithProjectionId;

Expand All @@ -34,6 +35,7 @@
* Get Operations
*
* @author Christoph Strobl
* @author Tigran Babloyan
* @since 2.0
*/
public interface ReactiveFindByIdOperation {
Expand Down Expand Up @@ -136,11 +138,25 @@ interface FindByIdWithExpiry<T> extends FindByIdWithProjection<T>, WithExpiry<T>
FindByIdWithProjection<T> withExpiry(Duration expiry);
}

interface FindByIdWithLock<T> extends FindByIdWithExpiry<T>, WithLock<T> {
/**
* Fetches document and write-locks it for the given duration.
* <p>
* Note that the client does not enforce an upper limit on the {@link Duration} lockTime. The maximum lock time
* by default on the server is 30 seconds. Any value larger than 30 seconds will be capped down by the server to
* the default lock time, which is 15 seconds unless modified on the server side.
*
* @param lockDuration how long to write-lock the document for (any duration > 30s will be capped to server default of 15s).
*/
@Override
FindByIdWithExpiry<T> withLock(Duration lockDuration);
}

/**
* Provides methods for constructing query operations in a fluent way.
*
* @param <T> the entity type to use for the results
*/
interface ReactiveFindById<T> extends FindByIdWithExpiry<T> {}
interface ReactiveFindById<T> extends FindByIdWithLock<T> {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
package org.springframework.data.couchbase.core;

import static com.couchbase.client.java.kv.GetAndTouchOptions.getAndTouchOptions;
import static com.couchbase.client.java.kv.GetAndLockOptions.getAndLockOptions;
import static com.couchbase.client.java.transactions.internal.ConverterUtil.makeCollectionIdentifier;

import com.couchbase.client.java.kv.GetAndLockOptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -26,6 +28,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,6 +48,7 @@
* {@link ReactiveFindByIdOperation} implementations for Couchbase.
*
* @author Michael Reiche
* @author Tigran Babloyan
*/
public class ReactiveFindByIdOperationSupport implements ReactiveFindByIdOperation {

Expand All @@ -58,7 +62,7 @@ public class ReactiveFindByIdOperationSupport implements ReactiveFindByIdOperati
@Override
public <T> ReactiveFindById<T> findById(Class<T> domainType) {
return new ReactiveFindByIdSupport<>(template, domainType, OptionsBuilder.getScopeFrom(domainType),
OptionsBuilder.getCollectionFrom(domainType), null, null, null, template.support());
OptionsBuilder.getCollectionFrom(domainType), null, null, null, null, template.support());
}

static class ReactiveFindByIdSupport<T> implements ReactiveFindById<T> {
Expand All @@ -71,11 +75,12 @@ static class ReactiveFindByIdSupport<T> implements ReactiveFindById<T> {
private final List<String> fields;
private final ReactiveTemplateSupport support;
private final Duration expiry;
private final Duration lockDuration;

private Duration expiryToUse;

ReactiveFindByIdSupport(ReactiveCouchbaseTemplate template, Class<T> domainType, String scope, String collection,
CommonOptions<?> options, List<String> fields, Duration expiry, ReactiveTemplateSupport support) {
CommonOptions<?> options, List<String> fields, Duration expiry, Duration lockDuration, ReactiveTemplateSupport support) {
this.template = template;
this.domainType = domainType;
this.scope = scope;
Expand All @@ -84,6 +89,7 @@ static class ReactiveFindByIdSupport<T> implements ReactiveFindById<T> {
this.fields = fields;
this.expiry = expiry;
this.support = support;
this.lockDuration = lockDuration;
}

@Override
Expand All @@ -99,8 +105,12 @@ public Mono<T> one(final Object id) {

Mono<T> reactiveEntity = TransactionalSupport.checkForTransactionInThreadLocalStorage().flatMap(ctxOpt -> {
if (!ctxOpt.isPresent()) {
if (pArgs.getOptions() instanceof GetAndTouchOptions) {
return rc.getAndTouch(id.toString(), expiryToUse, (GetAndTouchOptions) pArgs.getOptions())
if (pArgs.getOptions() instanceof GetAndTouchOptions options) {
return rc.getAndTouch(id.toString(), expiryToUse, options)
.flatMap(result -> support.decodeEntity(id, result.contentAs(String.class), result.cas(), domainType,
pArgs.getScope(), pArgs.getCollection(), null, null));
} else if (pArgs.getOptions() instanceof GetAndLockOptions options) {
return rc.getAndLock(id.toString(), Optional.of(lockDuration).orElse(Duration.ZERO), options)
.flatMap(result -> support.decodeEntity(id, result.contentAs(String.class), result.cas(), domainType,
pArgs.getScope(), pArgs.getCollection(), null, null));
} else {
Expand Down Expand Up @@ -132,6 +142,9 @@ public Mono<T> one(final Object id) {
}

private void rejectInvalidTransactionalOptions() {
if (this.lockDuration != null) {
throw new IllegalArgumentException("withLock is not supported in a transaction");
}
if (this.expiry != null) {
throw new IllegalArgumentException("withExpiry is not supported in a transaction");
}
Expand All @@ -151,39 +164,53 @@ public Flux<? extends T> all(final Collection<String> ids) {
@Override
public FindByIdInScope<T> withOptions(final GetOptions options) {
Assert.notNull(options, "Options must not be null.");
return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry, support);
return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry,
lockDuration, support);
}

@Override
public FindByIdWithOptions<T> inCollection(final String collection) {
return new ReactiveFindByIdSupport<>(template, domainType, scope,
collection != null ? collection : this.collection, options, fields, expiry, support);
collection != null ? collection : this.collection, options, fields, expiry, lockDuration, support);
}

@Override
public FindByIdInCollection<T> inScope(final String scope) {
return new ReactiveFindByIdSupport<>(template, domainType, scope != null ? scope : this.scope, collection,
options, fields, expiry, support);
options, fields, expiry, lockDuration, support);
}

@Override
public FindByIdInCollection<T> project(String... fields) {
Assert.notNull(fields, "Fields must not be null");
return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, Arrays.asList(fields),
expiry, support);
expiry, lockDuration, support);
}

@Override
public FindByIdWithProjection<T> withExpiry(final Duration expiry) {
return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry, support);
return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry,
lockDuration, support);
}

@Override
public FindByIdWithExpiry<T> withLock(final Duration lockDuration) {
return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry,
lockDuration, support);
}

private CommonOptions<?> initGetOptions() {
CommonOptions<?> getOptions;
final CouchbasePersistentEntity<?> entity = template.getConverter().getMappingContext()
.getRequiredPersistentEntity(domainType);
Boolean isTouchOnRead = entity.isTouchOnRead();
if (expiry != null || isTouchOnRead || options instanceof GetAndTouchOptions) {
if(lockDuration != null || options instanceof GetAndLockOptions) {
GetAndLockOptions gOptions = options != null ? (GetAndLockOptions) options : getAndLockOptions();
if (gOptions.build().transcoder() == null) {
gOptions.transcoder(RawJsonTranscoder.INSTANCE);
}
getOptions = gOptions;
} else if (expiry != null || isTouchOnRead || options instanceof GetAndTouchOptions) {
if (expiry != null) {
expiryToUse = expiry;
} else if (isTouchOnRead) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2020-2023 the original author or authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.couchbase.core.support;

import java.time.Duration;

/**
* A common interface for those that support withLock()
*
* @author Tigran Babloyan
* @param <R> - the entity class
*/
public interface WithLock<R> {
Object withLock(Duration lockDuration);

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.Set;
import java.util.UUID;

import com.couchbase.client.core.error.TimeoutException;
import com.couchbase.client.core.retry.RetryReason;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -80,6 +82,28 @@ public void beforeEach() {
couchbaseTemplate.removeByQuery(User.class).withConsistency(QueryScanConsistency.REQUEST_PLUS).all();
}

@Test
void findByIdWithLock() {
try {
User user = new User(UUID.randomUUID().toString(), "user1", "user1");

couchbaseTemplate.upsertById(User.class).one(user);

User foundUser = couchbaseTemplate.findById(User.class).withLock(Duration.ofSeconds(2)).one(user.getId());
user.setVersion(foundUser.getVersion());// version will have changed
assertEquals(user, foundUser);

TimeoutException exception = assertThrows(TimeoutException.class, () ->
couchbaseTemplate.upsertById(User.class).one(user)
);
assertTrue(exception.retryReasons().contains(RetryReason.KV_LOCKED), "should have been locked");
} finally {
sleepSecs(2);
couchbaseTemplate.removeByQuery(User.class).withConsistency(QueryScanConsistency.REQUEST_PLUS).all();
}

}

@Test
void findByIdWithExpiry() {
try {
Expand Down
Loading