Skip to content

Commit 1a97c73

Browse files
babltigamikereiche
authored andcommitted
Added support of the read locks (#1655)
Closes #1059.
1 parent f4f9143 commit 1a97c73

7 files changed

+164
-20
lines changed

src/main/java/org/springframework/data/couchbase/core/ExecutableFindByIdOperation.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626

2727
import com.couchbase.client.java.kv.GetOptions;
2828
import org.springframework.data.couchbase.core.support.WithExpiry;
29+
import org.springframework.data.couchbase.core.support.WithLock;
2930

3031
/**
3132
* Get Operations
3233
*
3334
* @author Christoph Strobl
35+
* @author Tigran Babloyan
3436
* @since 2.0
3537
*/
3638
public interface ExecutableFindByIdOperation {
@@ -132,11 +134,25 @@ interface FindByIdWithExpiry<T> extends FindByIdWithProjection<T>, WithExpiry<T>
132134
FindByIdWithProjection<T> withExpiry(Duration expiry);
133135
}
134136

137+
interface FindByIdWithLock<T> extends FindByIdWithExpiry<T>, WithLock<T> {
138+
/**
139+
* Fetches document and write-locks it for the given duration.
140+
* <p>
141+
* Note that the client does not enforce an upper limit on the {@link Duration} lockTime. The maximum lock time
142+
* by default on the server is 30 seconds. Any value larger than 30 seconds will be capped down by the server to
143+
* the default lock time, which is 15 seconds unless modified on the server side.
144+
*
145+
* @param lockDuration how long to write-lock the document for (any duration > 30s will be capped to server default of 15s).
146+
*/
147+
@Override
148+
FindByIdWithExpiry<T> withLock(Duration lockDuration);
149+
}
150+
135151
/**
136152
* Provides methods for constructing query operations in a fluent way.
137153
*
138154
* @param <T> the entity type to use for the results
139155
*/
140-
interface ExecutableFindById<T> extends FindByIdWithExpiry<T> {}
156+
interface ExecutableFindById<T> extends FindByIdWithLock<T> {}
141157

142158
}

src/main/java/org/springframework/data/couchbase/core/ExecutableFindByIdOperationSupport.java

+15-8
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class ExecutableFindByIdOperationSupport implements ExecutableFindByIdOpe
3737
@Override
3838
public <T> ExecutableFindById<T> findById(Class<T> domainType) {
3939
return new ExecutableFindByIdSupport<>(template, domainType, OptionsBuilder.getScopeFrom(domainType),
40-
OptionsBuilder.getCollectionFrom(domainType),null, null, null);
40+
OptionsBuilder.getCollectionFrom(domainType),null, null, null, null);
4141
}
4242

4343
static class ExecutableFindByIdSupport<T> implements ExecutableFindById<T> {
@@ -49,19 +49,21 @@ static class ExecutableFindByIdSupport<T> implements ExecutableFindById<T> {
4949
private final GetOptions options;
5050
private final List<String> fields;
5151
private final Duration expiry;
52+
private final Duration lockDuration;
5253
private final ReactiveFindByIdSupport<T> reactiveSupport;
5354

5455
ExecutableFindByIdSupport(CouchbaseTemplate template, Class<T> domainType, String scope, String collection,
55-
GetOptions options, List<String> fields, Duration expiry) {
56+
GetOptions options, List<String> fields, Duration expiry, Duration lockDuration) {
5657
this.template = template;
5758
this.domainType = domainType;
5859
this.scope = scope;
5960
this.collection = collection;
6061
this.options = options;
6162
this.fields = fields;
6263
this.expiry = expiry;
64+
this.lockDuration = lockDuration;
6365
this.reactiveSupport = new ReactiveFindByIdSupport<>(template.reactive(), domainType, scope, collection, options,
64-
fields, expiry, new NonReactiveSupportWrapper(template.support()));
66+
fields, expiry, lockDuration, new NonReactiveSupportWrapper(template.support()));
6567
}
6668

6769
@Override
@@ -77,31 +79,36 @@ public Collection<? extends T> all(final Collection<String> ids) {
7779
@Override
7880
public TerminatingFindById<T> withOptions(final GetOptions options) {
7981
Assert.notNull(options, "Options must not be null.");
80-
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry);
82+
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry, lockDuration);
8183
}
8284

8385
@Override
8486
public FindByIdWithOptions<T> inCollection(final String collection) {
85-
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection != null ? collection : this.collection, options, fields, expiry);
87+
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection != null ? collection : this.collection, options, fields, expiry, lockDuration);
8688
}
8789

8890
@Override
8991
public FindByIdInCollection<T> inScope(final String scope) {
90-
return new ExecutableFindByIdSupport<>(template, domainType, scope != null ? scope : this.scope, collection, options, fields, expiry);
92+
return new ExecutableFindByIdSupport<>(template, domainType, scope != null ? scope : this.scope, collection, options, fields, expiry, lockDuration);
9193
}
9294

9395
@Override
9496
public FindByIdInScope<T> project(String... fields) {
9597
Assert.notEmpty(fields, "Fields must not be null.");
96-
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, Arrays.asList(fields), expiry);
98+
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, Arrays.asList(fields), expiry, lockDuration);
9799
}
98100

99101
@Override
100102
public FindByIdWithProjection<T> withExpiry(final Duration expiry) {
101103
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, fields,
102-
expiry);
104+
expiry, lockDuration);
103105
}
104106

107+
@Override
108+
public FindByIdWithExpiry<T> withLock(final Duration lockDuration) {
109+
return new ExecutableFindByIdSupport<>(template, domainType, scope, collection, options, fields,
110+
expiry, lockDuration);
111+
}
105112
}
106113

107114
}

src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperation.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.data.couchbase.core.support.InScope;
2626
import org.springframework.data.couchbase.core.support.OneAndAllIdReactive;
2727
import org.springframework.data.couchbase.core.support.WithExpiry;
28+
import org.springframework.data.couchbase.core.support.WithLock;
2829
import org.springframework.data.couchbase.core.support.WithGetOptions;
2930
import org.springframework.data.couchbase.core.support.WithProjectionId;
3031

@@ -34,6 +35,7 @@
3435
* Get Operations
3536
*
3637
* @author Christoph Strobl
38+
* @author Tigran Babloyan
3739
* @since 2.0
3840
*/
3941
public interface ReactiveFindByIdOperation {
@@ -136,11 +138,25 @@ interface FindByIdWithExpiry<T> extends FindByIdWithProjection<T>, WithExpiry<T>
136138
FindByIdWithProjection<T> withExpiry(Duration expiry);
137139
}
138140

141+
interface FindByIdWithLock<T> extends FindByIdWithExpiry<T>, WithLock<T> {
142+
/**
143+
* Fetches document and write-locks it for the given duration.
144+
* <p>
145+
* Note that the client does not enforce an upper limit on the {@link Duration} lockTime. The maximum lock time
146+
* by default on the server is 30 seconds. Any value larger than 30 seconds will be capped down by the server to
147+
* the default lock time, which is 15 seconds unless modified on the server side.
148+
*
149+
* @param lockDuration how long to write-lock the document for (any duration > 30s will be capped to server default of 15s).
150+
*/
151+
@Override
152+
FindByIdWithExpiry<T> withLock(Duration lockDuration);
153+
}
154+
139155
/**
140156
* Provides methods for constructing query operations in a fluent way.
141157
*
142158
* @param <T> the entity type to use for the results
143159
*/
144-
interface ReactiveFindById<T> extends FindByIdWithExpiry<T> {}
160+
interface ReactiveFindById<T> extends FindByIdWithLock<T> {}
145161

146162
}

src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperationSupport.java

+37-10
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
package org.springframework.data.couchbase.core;
1717

1818
import static com.couchbase.client.java.kv.GetAndTouchOptions.getAndTouchOptions;
19+
import static com.couchbase.client.java.kv.GetAndLockOptions.getAndLockOptions;
1920
import static com.couchbase.client.java.transactions.internal.ConverterUtil.makeCollectionIdentifier;
2021

22+
import com.couchbase.client.java.kv.GetAndLockOptions;
2123
import reactor.core.publisher.Flux;
2224
import reactor.core.publisher.Mono;
2325

@@ -26,6 +28,7 @@
2628
import java.util.Arrays;
2729
import java.util.Collection;
2830
import java.util.List;
31+
import java.util.Optional;
2932

3033
import org.slf4j.Logger;
3134
import org.slf4j.LoggerFactory;
@@ -45,6 +48,7 @@
4548
* {@link ReactiveFindByIdOperation} implementations for Couchbase.
4649
*
4750
* @author Michael Reiche
51+
* @author Tigran Babloyan
4852
*/
4953
public class ReactiveFindByIdOperationSupport implements ReactiveFindByIdOperation {
5054

@@ -58,7 +62,7 @@ public class ReactiveFindByIdOperationSupport implements ReactiveFindByIdOperati
5862
@Override
5963
public <T> ReactiveFindById<T> findById(Class<T> domainType) {
6064
return new ReactiveFindByIdSupport<>(template, domainType, OptionsBuilder.getScopeFrom(domainType),
61-
OptionsBuilder.getCollectionFrom(domainType), null, null, null, template.support());
65+
OptionsBuilder.getCollectionFrom(domainType), null, null, null, null, template.support());
6266
}
6367

6468
static class ReactiveFindByIdSupport<T> implements ReactiveFindById<T> {
@@ -71,11 +75,12 @@ static class ReactiveFindByIdSupport<T> implements ReactiveFindById<T> {
7175
private final List<String> fields;
7276
private final ReactiveTemplateSupport support;
7377
private final Duration expiry;
78+
private final Duration lockDuration;
7479

7580
private Duration expiryToUse;
7681

7782
ReactiveFindByIdSupport(ReactiveCouchbaseTemplate template, Class<T> domainType, String scope, String collection,
78-
CommonOptions<?> options, List<String> fields, Duration expiry, ReactiveTemplateSupport support) {
83+
CommonOptions<?> options, List<String> fields, Duration expiry, Duration lockDuration, ReactiveTemplateSupport support) {
7984
this.template = template;
8085
this.domainType = domainType;
8186
this.scope = scope;
@@ -84,6 +89,7 @@ static class ReactiveFindByIdSupport<T> implements ReactiveFindById<T> {
8489
this.fields = fields;
8590
this.expiry = expiry;
8691
this.support = support;
92+
this.lockDuration = lockDuration;
8793
}
8894

8995
@Override
@@ -99,8 +105,12 @@ public Mono<T> one(final Object id) {
99105

100106
Mono<T> reactiveEntity = TransactionalSupport.checkForTransactionInThreadLocalStorage().flatMap(ctxOpt -> {
101107
if (!ctxOpt.isPresent()) {
102-
if (pArgs.getOptions() instanceof GetAndTouchOptions) {
103-
return rc.getAndTouch(id.toString(), expiryToUse, (GetAndTouchOptions) pArgs.getOptions())
108+
if (pArgs.getOptions() instanceof GetAndTouchOptions options) {
109+
return rc.getAndTouch(id.toString(), expiryToUse, options)
110+
.flatMap(result -> support.decodeEntity(id, result.contentAs(String.class), result.cas(), domainType,
111+
pArgs.getScope(), pArgs.getCollection(), null, null));
112+
} else if (pArgs.getOptions() instanceof GetAndLockOptions options) {
113+
return rc.getAndLock(id.toString(), Optional.of(lockDuration).orElse(Duration.ZERO), options)
104114
.flatMap(result -> support.decodeEntity(id, result.contentAs(String.class), result.cas(), domainType,
105115
pArgs.getScope(), pArgs.getCollection(), null, null));
106116
} else {
@@ -132,6 +142,9 @@ public Mono<T> one(final Object id) {
132142
}
133143

134144
private void rejectInvalidTransactionalOptions() {
145+
if (this.lockDuration != null) {
146+
throw new IllegalArgumentException("withLock is not supported in a transaction");
147+
}
135148
if (this.expiry != null) {
136149
throw new IllegalArgumentException("withExpiry is not supported in a transaction");
137150
}
@@ -151,39 +164,53 @@ public Flux<? extends T> all(final Collection<String> ids) {
151164
@Override
152165
public FindByIdInScope<T> withOptions(final GetOptions options) {
153166
Assert.notNull(options, "Options must not be null.");
154-
return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry, support);
167+
return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry,
168+
lockDuration, support);
155169
}
156170

157171
@Override
158172
public FindByIdWithOptions<T> inCollection(final String collection) {
159173
return new ReactiveFindByIdSupport<>(template, domainType, scope,
160-
collection != null ? collection : this.collection, options, fields, expiry, support);
174+
collection != null ? collection : this.collection, options, fields, expiry, lockDuration, support);
161175
}
162176

163177
@Override
164178
public FindByIdInCollection<T> inScope(final String scope) {
165179
return new ReactiveFindByIdSupport<>(template, domainType, scope != null ? scope : this.scope, collection,
166-
options, fields, expiry, support);
180+
options, fields, expiry, lockDuration, support);
167181
}
168182

169183
@Override
170184
public FindByIdInCollection<T> project(String... fields) {
171185
Assert.notNull(fields, "Fields must not be null");
172186
return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, Arrays.asList(fields),
173-
expiry, support);
187+
expiry, lockDuration, support);
174188
}
175189

176190
@Override
177191
public FindByIdWithProjection<T> withExpiry(final Duration expiry) {
178-
return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry, support);
192+
return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry,
193+
lockDuration, support);
194+
}
195+
196+
@Override
197+
public FindByIdWithExpiry<T> withLock(final Duration lockDuration) {
198+
return new ReactiveFindByIdSupport<>(template, domainType, scope, collection, options, fields, expiry,
199+
lockDuration, support);
179200
}
180201

181202
private CommonOptions<?> initGetOptions() {
182203
CommonOptions<?> getOptions;
183204
final CouchbasePersistentEntity<?> entity = template.getConverter().getMappingContext()
184205
.getRequiredPersistentEntity(domainType);
185206
Boolean isTouchOnRead = entity.isTouchOnRead();
186-
if (expiry != null || isTouchOnRead || options instanceof GetAndTouchOptions) {
207+
if(lockDuration != null || options instanceof GetAndLockOptions) {
208+
GetAndLockOptions gOptions = options != null ? (GetAndLockOptions) options : getAndLockOptions();
209+
if (gOptions.build().transcoder() == null) {
210+
gOptions.transcoder(RawJsonTranscoder.INSTANCE);
211+
}
212+
getOptions = gOptions;
213+
} else if (expiry != null || isTouchOnRead || options instanceof GetAndTouchOptions) {
187214
if (expiry != null) {
188215
expiryToUse = expiry;
189216
} else if (isTouchOnRead) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2020-2023 the original author or authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.couchbase.core.support;
17+
18+
import java.time.Duration;
19+
20+
/**
21+
* A common interface for those that support withLock()
22+
*
23+
* @author Tigran Babloyan
24+
* @param <R> - the entity class
25+
*/
26+
public interface WithLock<R> {
27+
Object withLock(Duration lockDuration);
28+
29+
}

src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java

+24
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import java.util.Set;
3636
import java.util.UUID;
3737

38+
import com.couchbase.client.core.error.TimeoutException;
39+
import com.couchbase.client.core.retry.RetryReason;
3840
import org.junit.jupiter.api.BeforeEach;
3941
import org.junit.jupiter.api.Test;
4042
import org.springframework.beans.factory.annotation.Autowired;
@@ -80,6 +82,28 @@ public void beforeEach() {
8082
couchbaseTemplate.removeByQuery(User.class).withConsistency(QueryScanConsistency.REQUEST_PLUS).all();
8183
}
8284

85+
@Test
86+
void findByIdWithLock() {
87+
try {
88+
User user = new User(UUID.randomUUID().toString(), "user1", "user1");
89+
90+
couchbaseTemplate.upsertById(User.class).one(user);
91+
92+
User foundUser = couchbaseTemplate.findById(User.class).withLock(Duration.ofSeconds(2)).one(user.getId());
93+
user.setVersion(foundUser.getVersion());// version will have changed
94+
assertEquals(user, foundUser);
95+
96+
TimeoutException exception = assertThrows(TimeoutException.class, () ->
97+
couchbaseTemplate.upsertById(User.class).one(user)
98+
);
99+
assertTrue(exception.retryReasons().contains(RetryReason.KV_LOCKED), "should have been locked");
100+
} finally {
101+
sleepSecs(2);
102+
couchbaseTemplate.removeByQuery(User.class).withConsistency(QueryScanConsistency.REQUEST_PLUS).all();
103+
}
104+
105+
}
106+
83107
@Test
84108
void findByIdWithExpiry() {
85109
try {

0 commit comments

Comments
 (0)