diff --git a/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByIdOperation.java b/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByIdOperation.java index 58838d921..5c7f6095b 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByIdOperation.java +++ b/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByIdOperation.java @@ -26,11 +26,13 @@ import com.couchbase.client.java.kv.GetOptions; import org.springframework.data.couchbase.core.support.WithExpiry; +import org.springframework.data.couchbase.core.support.WithLock; /** * Get Operations * * @author Christoph Strobl + * @author Tigran Babloyan * @since 2.0 */ public interface ExecutableFindByIdOperation { @@ -132,11 +134,25 @@ interface FindByIdWithExpiry extends FindByIdWithProjection, WithExpiry FindByIdWithProjection withExpiry(Duration expiry); } + interface FindByIdWithLock extends FindByIdWithExpiry, WithLock { + /** + * Fetches document and write-locks it for the given duration. + *

+ * Note that the client does not enforce an upper limit on the {@link Duration} lockTime. The maximum lock time + * by default on the server is 30 seconds. Any value larger than 30 seconds will be capped down by the server to + * the default lock time, which is 15 seconds unless modified on the server side. + * + * @param lockDuration how long to write-lock the document for (any duration > 30s will be capped to server default of 15s). + */ + @Override + FindByIdWithExpiry withLock(Duration lockDuration); + } + /** * Provides methods for constructing query operations in a fluent way. * * @param the entity type to use for the results */ - interface ExecutableFindById extends FindByIdWithExpiry {} + interface ExecutableFindById extends FindByIdWithLock {} } diff --git a/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByIdOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByIdOperationSupport.java index 8a272bfcb..22b95284e 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByIdOperationSupport.java +++ b/src/main/java/org/springframework/data/couchbase/core/ExecutableFindByIdOperationSupport.java @@ -37,7 +37,7 @@ public class ExecutableFindByIdOperationSupport implements ExecutableFindByIdOpe @Override public ExecutableFindById findById(Class domainType) { return new ExecutableFindByIdSupport<>(template, domainType, OptionsBuilder.getScopeFrom(domainType), - OptionsBuilder.getCollectionFrom(domainType),null, null, null); + OptionsBuilder.getCollectionFrom(domainType),null, null, null, null); } static class ExecutableFindByIdSupport implements ExecutableFindById { @@ -49,10 +49,11 @@ static class ExecutableFindByIdSupport implements ExecutableFindById { private final GetOptions options; private final List fields; private final Duration expiry; + private final Duration lockDuration; private final ReactiveFindByIdSupport reactiveSupport; ExecutableFindByIdSupport(CouchbaseTemplate template, Class domainType, String scope, String collection, - GetOptions options, List fields, Duration expiry) { + GetOptions options, List fields, Duration expiry, Duration lockDuration) { this.template = template; this.domainType = domainType; this.scope = scope; @@ -60,8 +61,9 @@ static class ExecutableFindByIdSupport implements ExecutableFindById { this.options = options; this.fields = fields; this.expiry = expiry; + this.lockDuration = lockDuration; this.reactiveSupport = new ReactiveFindByIdSupport<>(template.reactive(), domainType, scope, collection, options, - fields, expiry, new NonReactiveSupportWrapper(template.support())); + fields, expiry, lockDuration, new NonReactiveSupportWrapper(template.support())); } @Override @@ -77,31 +79,36 @@ public Collection all(final Collection ids) { @Override public TerminatingFindById withOptions(final GetOptions options) { Assert.notNull(options, "Options must not be null."); - return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry); + return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry, lockDuration); } @Override public FindByIdWithOptions inCollection(final String collection) { - return new ExecutableFindByIdSupport<>(template, domainType, scope, collection != null ? collection : this.collection, options, fields, expiry); + return new ExecutableFindByIdSupport<>(template, domainType, scope, collection != null ? collection : this.collection, options, fields, expiry, lockDuration); } @Override public FindByIdInCollection inScope(final String scope) { - return new ExecutableFindByIdSupport<>(template, domainType, scope != null ? scope : this.scope, collection, options, fields, expiry); + return new ExecutableFindByIdSupport<>(template, domainType, scope != null ? scope : this.scope, collection, options, fields, expiry, lockDuration); } @Override public FindByIdInScope project(String... fields) { Assert.notEmpty(fields, "Fields must not be null."); - return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, Arrays.asList(fields), expiry); + return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, Arrays.asList(fields), expiry, lockDuration); } @Override public FindByIdWithProjection withExpiry(final Duration expiry) { return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, fields, - expiry); + expiry, lockDuration); } + @Override + public FindByIdWithExpiry withLock(final Duration lockDuration) { + return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, fields, + expiry, lockDuration); + } } } diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperation.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperation.java index 6ec509233..59ae53fce 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperation.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperation.java @@ -25,6 +25,7 @@ import org.springframework.data.couchbase.core.support.InScope; import org.springframework.data.couchbase.core.support.OneAndAllIdReactive; import org.springframework.data.couchbase.core.support.WithExpiry; +import org.springframework.data.couchbase.core.support.WithLock; import org.springframework.data.couchbase.core.support.WithGetOptions; import org.springframework.data.couchbase.core.support.WithProjectionId; @@ -34,6 +35,7 @@ * Get Operations * * @author Christoph Strobl + * @author Tigran Babloyan * @since 2.0 */ public interface ReactiveFindByIdOperation { @@ -136,11 +138,25 @@ interface FindByIdWithExpiry extends FindByIdWithProjection, WithExpiry FindByIdWithProjection withExpiry(Duration expiry); } + interface FindByIdWithLock extends FindByIdWithExpiry, WithLock { + /** + * Fetches document and write-locks it for the given duration. + *

+ * Note that the client does not enforce an upper limit on the {@link Duration} lockTime. The maximum lock time + * by default on the server is 30 seconds. Any value larger than 30 seconds will be capped down by the server to + * the default lock time, which is 15 seconds unless modified on the server side. + * + * @param lockDuration how long to write-lock the document for (any duration > 30s will be capped to server default of 15s). + */ + @Override + FindByIdWithExpiry withLock(Duration lockDuration); + } + /** * Provides methods for constructing query operations in a fluent way. * * @param the entity type to use for the results */ - interface ReactiveFindById extends FindByIdWithExpiry {} + interface ReactiveFindById extends FindByIdWithLock {} } diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperationSupport.java index ddb7358ae..81fcf6a36 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperationSupport.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperationSupport.java @@ -16,8 +16,10 @@ package org.springframework.data.couchbase.core; import static com.couchbase.client.java.kv.GetAndTouchOptions.getAndTouchOptions; +import static com.couchbase.client.java.kv.GetAndLockOptions.getAndLockOptions; import static com.couchbase.client.java.transactions.internal.ConverterUtil.makeCollectionIdentifier; +import com.couchbase.client.java.kv.GetAndLockOptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -26,6 +28,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +48,7 @@ * {@link ReactiveFindByIdOperation} implementations for Couchbase. * * @author Michael Reiche + * @author Tigran Babloyan */ public class ReactiveFindByIdOperationSupport implements ReactiveFindByIdOperation { @@ -58,7 +62,7 @@ public class ReactiveFindByIdOperationSupport implements ReactiveFindByIdOperati @Override public ReactiveFindById findById(Class domainType) { return new ReactiveFindByIdSupport<>(template, domainType, OptionsBuilder.getScopeFrom(domainType), - OptionsBuilder.getCollectionFrom(domainType), null, null, null, template.support()); + OptionsBuilder.getCollectionFrom(domainType), null, null, null, null, template.support()); } static class ReactiveFindByIdSupport implements ReactiveFindById { @@ -71,11 +75,12 @@ static class ReactiveFindByIdSupport implements ReactiveFindById { private final List fields; private final ReactiveTemplateSupport support; private final Duration expiry; + private final Duration lockDuration; private Duration expiryToUse; ReactiveFindByIdSupport(ReactiveCouchbaseTemplate template, Class domainType, String scope, String collection, - CommonOptions options, List fields, Duration expiry, ReactiveTemplateSupport support) { + CommonOptions options, List fields, Duration expiry, Duration lockDuration, ReactiveTemplateSupport support) { this.template = template; this.domainType = domainType; this.scope = scope; @@ -84,6 +89,7 @@ static class ReactiveFindByIdSupport implements ReactiveFindById { this.fields = fields; this.expiry = expiry; this.support = support; + this.lockDuration = lockDuration; } @Override @@ -99,8 +105,12 @@ public Mono one(final Object id) { Mono reactiveEntity = TransactionalSupport.checkForTransactionInThreadLocalStorage().flatMap(ctxOpt -> { if (!ctxOpt.isPresent()) { - if (pArgs.getOptions() instanceof GetAndTouchOptions) { - return rc.getAndTouch(id.toString(), expiryToUse, (GetAndTouchOptions) pArgs.getOptions()) + if (pArgs.getOptions() instanceof GetAndTouchOptions options) { + return rc.getAndTouch(id.toString(), expiryToUse, options) + .flatMap(result -> support.decodeEntity(id, result.contentAs(String.class), result.cas(), domainType, + pArgs.getScope(), pArgs.getCollection(), null, null)); + } else if (pArgs.getOptions() instanceof GetAndLockOptions options) { + return rc.getAndLock(id.toString(), Optional.of(lockDuration).orElse(Duration.ZERO), options) .flatMap(result -> support.decodeEntity(id, result.contentAs(String.class), result.cas(), domainType, pArgs.getScope(), pArgs.getCollection(), null, null)); } else { @@ -132,6 +142,9 @@ public Mono one(final Object id) { } private void rejectInvalidTransactionalOptions() { + if (this.lockDuration != null) { + throw new IllegalArgumentException("withLock is not supported in a transaction"); + } if (this.expiry != null) { throw new IllegalArgumentException("withExpiry is not supported in a transaction"); } @@ -151,31 +164,39 @@ public Flux all(final Collection ids) { @Override public FindByIdInScope withOptions(final GetOptions options) { Assert.notNull(options, "Options must not be null."); - return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry, support); + return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry, + lockDuration, support); } @Override public FindByIdWithOptions inCollection(final String collection) { return new ReactiveFindByIdSupport<>(template, domainType, scope, - collection != null ? collection : this.collection, options, fields, expiry, support); + collection != null ? collection : this.collection, options, fields, expiry, lockDuration, support); } @Override public FindByIdInCollection inScope(final String scope) { return new ReactiveFindByIdSupport<>(template, domainType, scope != null ? scope : this.scope, collection, - options, fields, expiry, support); + options, fields, expiry, lockDuration, support); } @Override public FindByIdInCollection project(String... fields) { Assert.notNull(fields, "Fields must not be null"); return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, Arrays.asList(fields), - expiry, support); + expiry, lockDuration, support); } @Override public FindByIdWithProjection withExpiry(final Duration expiry) { - return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry, support); + return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry, + lockDuration, support); + } + + @Override + public FindByIdWithExpiry withLock(final Duration lockDuration) { + return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry, + lockDuration, support); } private CommonOptions initGetOptions() { @@ -183,7 +204,13 @@ private CommonOptions initGetOptions() { final CouchbasePersistentEntity entity = template.getConverter().getMappingContext() .getRequiredPersistentEntity(domainType); Boolean isTouchOnRead = entity.isTouchOnRead(); - if (expiry != null || isTouchOnRead || options instanceof GetAndTouchOptions) { + if(lockDuration != null || options instanceof GetAndLockOptions) { + GetAndLockOptions gOptions = options != null ? (GetAndLockOptions) options : getAndLockOptions(); + if (gOptions.build().transcoder() == null) { + gOptions.transcoder(RawJsonTranscoder.INSTANCE); + } + getOptions = gOptions; + } else if (expiry != null || isTouchOnRead || options instanceof GetAndTouchOptions) { if (expiry != null) { expiryToUse = expiry; } else if (isTouchOnRead) { diff --git a/src/main/java/org/springframework/data/couchbase/core/support/WithLock.java b/src/main/java/org/springframework/data/couchbase/core/support/WithLock.java new file mode 100644 index 000000000..e23bb6b97 --- /dev/null +++ b/src/main/java/org/springframework/data/couchbase/core/support/WithLock.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020-2023 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.couchbase.core.support; + +import java.time.Duration; + +/** + * A common interface for those that support withLock() + * + * @author Tigran Babloyan + * @param - the entity class + */ +public interface WithLock { + Object withLock(Duration lockDuration); + +} diff --git a/src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java index 0c2a57333..7c5e68923 100644 --- a/src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java @@ -35,6 +35,8 @@ import java.util.Set; import java.util.UUID; +import com.couchbase.client.core.error.TimeoutException; +import com.couchbase.client.core.retry.RetryReason; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -80,6 +82,28 @@ public void beforeEach() { couchbaseTemplate.removeByQuery(User.class).withConsistency(QueryScanConsistency.REQUEST_PLUS).all(); } + @Test + void findByIdWithLock() { + try { + User user = new User(UUID.randomUUID().toString(), "user1", "user1"); + + couchbaseTemplate.upsertById(User.class).one(user); + + User foundUser = couchbaseTemplate.findById(User.class).withLock(Duration.ofSeconds(2)).one(user.getId()); + user.setVersion(foundUser.getVersion());// version will have changed + assertEquals(user, foundUser); + + TimeoutException exception = assertThrows(TimeoutException.class, () -> + couchbaseTemplate.upsertById(User.class).one(user) + ); + assertTrue(exception.retryReasons().contains(RetryReason.KV_LOCKED), "should have been locked"); + } finally { + sleepSecs(2); + couchbaseTemplate.removeByQuery(User.class).withConsistency(QueryScanConsistency.REQUEST_PLUS).all(); + } + + } + @Test void findByIdWithExpiry() { try { diff --git a/src/test/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplateKeyValueIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplateKeyValueIntegrationTests.java index 06ee0178c..5767d9d8a 100644 --- a/src/test/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplateKeyValueIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplateKeyValueIntegrationTests.java @@ -36,6 +36,8 @@ import java.util.Set; import java.util.UUID; +import com.couchbase.client.core.error.TimeoutException; +import com.couchbase.client.core.retry.RetryReason; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -114,6 +116,29 @@ void findByIdWithExpiry() { } + @Test + void findByIdWithLock() { + try { + User user = new User(UUID.randomUUID().toString(), "user1", "user1"); + + reactiveCouchbaseTemplate.upsertById(User.class).one(user).block(); + + User foundUser = reactiveCouchbaseTemplate.findById(User.class).withLock(Duration.ofSeconds(2)) + .one(user.getId()).block(); + user.setVersion(foundUser.getVersion());// version will have changed + assertEquals(user, foundUser); + + TimeoutException exception = assertThrows(TimeoutException.class, () -> + reactiveCouchbaseTemplate.upsertById(User.class).one(user).block() + ); + assertTrue(exception.retryReasons().contains(RetryReason.KV_LOCKED), "should have been locked"); + } finally { + sleepSecs(2); + reactiveCouchbaseTemplate.removeByQuery(User.class).withConsistency(REQUEST_PLUS).all().collectList().block(); + } + + } + @Test void upsertAndFindById() { User user = new User(UUID.randomUUID().toString(), "firstname", "lastname");