Skip to content

Commit d4786ff

Browse files
author
Alexandre Strubel
committed
GH-3272: Support lease renewal for distributed locks
1 parent 0dc03ad commit d4786ff

File tree

8 files changed

+227
-9
lines changed

8 files changed

+227
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.locks;
18+
19+
/**
20+
* A {@link LockRegistry} implementing this interface supports the renewal of the time to live of a lock
21+
*
22+
* @author Alexandre Strubel
23+
*
24+
* @since 5.4
25+
*/
26+
public interface RenewableLockRegistry extends LockRegistry {
27+
28+
/**
29+
* Renew the time to live of the lock is associated with the parameter object.
30+
* The lock must be held by the current thread
31+
* @param lockKey The object with which the lock is associated.
32+
*/
33+
void renewLock(Object lockKey);
34+
35+
}

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,6 +43,7 @@
4343
* @author Artem Bilan
4444
* @author Glenn Renfro
4545
* @author Gary Russell
46+
* @author Alexandre Strubel
4647
*
4748
* @since 4.3
4849
*/
@@ -179,4 +180,9 @@ private void deleteExpired(String lock) {
179180
new Date(System.currentTimeMillis() - this.ttl));
180181
}
181182

183+
@Override
184+
public boolean renew(String lock) {
185+
return this.template.update(this.updateQuery, new Date(), this.region, lock, this.id) > 0;
186+
}
187+
182188
}

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.dao.DataAccessResourceFailureException;
3131
import org.springframework.dao.TransientDataAccessException;
3232
import org.springframework.integration.support.locks.ExpirableLockRegistry;
33+
import org.springframework.integration.support.locks.RenewableLockRegistry;
3334
import org.springframework.integration.util.UUIDConverter;
3435
import org.springframework.transaction.TransactionTimedOutException;
3536
import org.springframework.util.Assert;
@@ -49,10 +50,11 @@
4950
* @author Bartosz Rempuszewski
5051
* @author Gary Russell
5152
* @author Alexandre Strubel
53+
* @author Stefan Vassilev
5254
*
5355
* @since 4.3
5456
*/
55-
public class JdbcLockRegistry implements ExpirableLockRegistry {
57+
public class JdbcLockRegistry implements ExpirableLockRegistry, RenewableLockRegistry {
5658

5759
private static final int DEFAULT_IDLE = 100;
5860

@@ -101,6 +103,19 @@ public void expireUnusedOlderThan(long age) {
101103
}
102104
}
103105

106+
@Override
107+
public void renewLock(Object lockKey) {
108+
Assert.isInstanceOf(String.class, lockKey);
109+
String path = pathFor((String) lockKey);
110+
JdbcLock jdbcLock = this.locks.get(path);
111+
if (jdbcLock == null) {
112+
throw new IllegalStateException("Could not found mutex at " + path);
113+
}
114+
if (!jdbcLock.renew()) {
115+
throw new IllegalStateException("Could not renew mutex at " + path);
116+
}
117+
}
118+
104119
private static final class JdbcLock implements Lock {
105120

106121
private final LockRepository mutex;
@@ -232,7 +247,7 @@ private boolean doLock() {
232247
@Override
233248
public void unlock() {
234249
if (!this.delegate.isHeldByCurrentThread()) {
235-
throw new IllegalMonitorStateException("You do not own mutex at " + this.path);
250+
throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.path);
236251
}
237252
if (this.delegate.getHoldCount() > 1) {
238253
this.delegate.unlock();
@@ -243,7 +258,7 @@ public void unlock() {
243258
this.mutex.delete(this.path);
244259
return;
245260
}
246-
catch (TransientDataAccessException e) {
261+
catch (TransientDataAccessException | TransactionTimedOutException e) {
247262
// try again
248263
}
249264
catch (Exception e) {
@@ -264,6 +279,27 @@ public boolean isAcquiredInThisProcess() {
264279
return this.mutex.isAcquired(this.path);
265280
}
266281

282+
public boolean renew() {
283+
if (!this.delegate.isHeldByCurrentThread()) {
284+
throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.path);
285+
}
286+
while (true) {
287+
try {
288+
boolean renewed = this.mutex.renew(this.path);
289+
if (renewed) {
290+
this.lastUsed = System.currentTimeMillis();
291+
}
292+
return renewed;
293+
}
294+
catch (TransientDataAccessException | TransactionTimedOutException e) {
295+
// try again
296+
}
297+
catch (Exception e) {
298+
throw new DataAccessResourceFailureException("Failed to renew mutex at " + this.path, e);
299+
}
300+
}
301+
}
302+
267303
}
268304

269305
}

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,8 @@
2424
* has to be declared as a bean.
2525
*
2626
* @author Dave Syer
27+
* @author Alexandre Strubel
28+
*
2729
* @since 4.3
2830
*/
2931
public interface LockRepository extends Closeable {
@@ -34,6 +36,8 @@ public interface LockRepository extends Closeable {
3436

3537
boolean acquire(String lock);
3638

39+
boolean renew(String lock);
40+
3741
@Override
3842
void close();
3943

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -52,6 +52,7 @@
5252
* @author Dave Syer
5353
* @author Artem Bilan
5454
* @author Glenn Renfro
55+
* @author Alexandre Strubel
5556
*
5657
* @since 4.3
5758
*/
@@ -276,4 +277,110 @@ public void testExclusiveAccess() throws Exception {
276277
}
277278
}
278279

280+
@Test
281+
public void testOutOfDateLockTaken() throws Exception {
282+
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
283+
client1.setTimeToLive(500);
284+
client1.afterPropertiesSet();
285+
final DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
286+
client2.afterPropertiesSet();
287+
Lock lock1 = new JdbcLockRegistry(client1).obtain("foo");
288+
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
289+
final CountDownLatch latch1 = new CountDownLatch(1);
290+
final CountDownLatch latch2 = new CountDownLatch(1);
291+
lock1.lockInterruptibly();
292+
Thread.sleep(500);
293+
new SimpleAsyncTaskExecutor()
294+
.execute(() -> {
295+
Lock lock2 = new JdbcLockRegistry(client2).obtain("foo");
296+
try {
297+
latch1.countDown();
298+
StopWatch stopWatch = new StopWatch();
299+
stopWatch.start();
300+
lock2.lockInterruptibly();
301+
stopWatch.stop();
302+
data.add(1);
303+
}
304+
catch (InterruptedException e) {
305+
Thread.currentThread().interrupt();
306+
}
307+
finally {
308+
lock2.unlock();
309+
}
310+
latch2.countDown();
311+
});
312+
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
313+
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
314+
data.add(2);
315+
lock1.unlock();
316+
for (int i = 0; i < 2; i++) {
317+
Integer integer = data.poll(10, TimeUnit.SECONDS);
318+
assertThat(integer).isNotNull();
319+
assertThat(integer.intValue()).isEqualTo(i + 1);
320+
}
321+
}
322+
323+
@Test
324+
public void testRenewLock() throws Exception {
325+
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
326+
client1.setTimeToLive(500);
327+
client1.afterPropertiesSet();
328+
final DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
329+
client2.afterPropertiesSet();
330+
JdbcLockRegistry registry = new JdbcLockRegistry(client1);
331+
Lock lock1 = registry.obtain("foo");
332+
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
333+
final CountDownLatch latch1 = new CountDownLatch(2);
334+
final CountDownLatch latch2 = new CountDownLatch(1);
335+
lock1.lockInterruptibly();
336+
new SimpleAsyncTaskExecutor()
337+
.execute(() -> {
338+
Lock lock2 = new JdbcLockRegistry(client2).obtain("foo");
339+
try {
340+
latch1.countDown();
341+
StopWatch stopWatch = new StopWatch();
342+
stopWatch.start();
343+
lock2.lockInterruptibly();
344+
stopWatch.stop();
345+
data.add(4);
346+
Thread.sleep(10);
347+
data.add(5);
348+
Thread.sleep(10);
349+
data.add(6);
350+
}
351+
catch (InterruptedException e) {
352+
Thread.currentThread().interrupt();
353+
}
354+
finally {
355+
lock2.unlock();
356+
}
357+
});
358+
new SimpleAsyncTaskExecutor()
359+
.execute(() -> {
360+
try {
361+
latch1.countDown();
362+
Thread.sleep(1000);
363+
data.add(1);
364+
Thread.sleep(100);
365+
data.add(2);
366+
Thread.sleep(100);
367+
data.add(3);
368+
latch2.countDown();
369+
}
370+
catch (InterruptedException e) {
371+
Thread.currentThread().interrupt();
372+
}
373+
});
374+
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
375+
while (latch2.getCount() > 0) {
376+
Thread.sleep(100);
377+
registry.renewLock("foo");
378+
}
379+
lock1.unlock();
380+
for (int i = 0; i < 6; i++) {
381+
Integer integer = data.poll(10, TimeUnit.SECONDS);
382+
assertThat(integer).isNotNull();
383+
assertThat(integer.intValue()).isEqualTo(i + 1);
384+
}
385+
}
279386
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727

2828
import org.junit.Before;
2929
import org.junit.Test;
30+
import org.junit.jupiter.api.Assertions;
3031
import org.junit.runner.RunWith;
3132

3233
import org.springframework.beans.factory.annotation.Autowired;
@@ -40,6 +41,7 @@
4041
/**
4142
* @author Dave Syer
4243
* @author Artem Bilan
44+
* @author Stefan Vassilev
4345
*
4446
* @since 4.3
4547
*/
@@ -168,7 +170,7 @@ public void testTwoThreadsSecondFailsToGetLock() throws Exception {
168170
lock1.unlock();
169171
Object ise = result.get(10, TimeUnit.SECONDS);
170172
assertThat(ise).isInstanceOf(IllegalMonitorStateException.class);
171-
assertThat(((Exception) ise).getMessage()).contains("You do not own");
173+
assertThat(((Exception) ise).getMessage()).contains("own");
172174
}
173175

174176
@Test
@@ -263,7 +265,25 @@ public void testTwoThreadsWrongOneUnlocks() throws Exception {
263265
lock.unlock();
264266
Object imse = result.get(10, TimeUnit.SECONDS);
265267
assertThat(imse).isInstanceOf(IllegalMonitorStateException.class);
266-
assertThat(((Exception) imse).getMessage()).contains("You do not own");
268+
assertThat(((Exception) imse).getMessage()).contains("own");
267269
}
268270

271+
@Test
272+
public void testLockRenew() {
273+
final Lock lock = this.registry.obtain("foo");
274+
275+
assertThat(lock.tryLock()).isTrue();
276+
try {
277+
registry.renewLock("foo");
278+
} finally {
279+
lock.unlock();
280+
}
281+
}
282+
283+
@Test
284+
public void testLockRenewLockNotOwned() {
285+
this.registry.obtain("foo");
286+
287+
Assertions.assertThrows(IllegalMonitorStateException.class, () -> registry.renewLock("foo"));
288+
}
269289
}

src/reference/asciidoc/jdbc.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,6 +1085,11 @@ If so, you can specify the `id` to be associated with the `DefaultLockRepository
10851085
Starting with version 5.1.8, the `JdbcLockRegistry` can be configured with the `idleBetweenTries` - a `Duration` to sleep between lock record insert/update executions.
10861086
By default it is `100` milliseconds and in some environments non-leaders pollute connections with data source too often.
10871087

1088+
Starting with version 5.4, the `RenewableLockRegistry` interface has been introduced and added to `JdbcLockRegistry`.
1089+
The `renewLock()` method must be called during locked process in case of the locked process would be longer than time to live of the lock.
1090+
So the time to live can be highly reduce and deployments can retake a lost lock quickly.
1091+
NB. The lock renewal can be done only if the lock is held by the current thread, so the locked process has to be executed in another thread.
1092+
10881093
[[jdbc-metadata-store]]
10891094
=== JDBC Metadata Store
10901095

src/reference/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ See <<./r2dbc.adoc#r2dbc,R2DBC Support>> for more information.
3030
The Channel Adapters for Redis Stream support have been introduced.
3131
See <<./redis.adoc#redis-stream-outbound,Redis Stream Outbound Channel Adapter>> for more information.
3232

33+
==== Renewable Lock Registry
34+
35+
A Renewable lock registry has been introduced to allow renew lease of a distributed lock.
36+
See <<./jdbc.adoc#jdbc-lock-registry,JDBC implementation>> for more information.
37+
3338
[[x5.4-general]]
3439
=== General Changes
3540

0 commit comments

Comments
 (0)