Skip to content

Commit 544924e

Browse files
author
Alexandre Strubel
committed
GH-3272: Support lease renewal for distributed locks
1 parent 0dc03ad commit 544924e

File tree

6 files changed

+206
-5
lines changed

6 files changed

+206
-5
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,6 +43,7 @@
4343
* @author Artem Bilan
4444
* @author Glenn Renfro
4545
* @author Gary Russell
46+
* @author Alexandre Strubel
4647
*
4748
* @since 4.3
4849
*/
@@ -179,4 +180,9 @@ private void deleteExpired(String lock) {
179180
new Date(System.currentTimeMillis() - this.ttl));
180181
}
181182

183+
@Override
184+
public boolean renew(String lock) {
185+
return this.template.update(this.updateQuery, new Date(), this.region, lock, this.id) > 0;
186+
}
187+
182188
}

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,11 @@
4949
* @author Bartosz Rempuszewski
5050
* @author Gary Russell
5151
* @author Alexandre Strubel
52+
* @author Stefan Vassilev
5253
*
5354
* @since 4.3
5455
*/
55-
public class JdbcLockRegistry implements ExpirableLockRegistry {
56+
public class JdbcLockRegistry implements ExpirableLockRegistry, RenewableLockRegistry {
5657

5758
private static final int DEFAULT_IDLE = 100;
5859

@@ -101,6 +102,20 @@ public void expireUnusedOlderThan(long age) {
101102
}
102103
}
103104

105+
@Override
106+
public void renewLock(Lock lock) {
107+
Assert.isInstanceOf(JdbcLock.class, lock);
108+
JdbcLock jdbcLock = (JdbcLock) lock;
109+
110+
if (!jdbcLock.isAcquiredInThisProcess()) {
111+
throw new IllegalMonitorStateException("You do not own mutex at " + jdbcLock.path);
112+
}
113+
114+
if (!jdbcLock.renew()) {
115+
throw new IllegalStateException("Could not renew mutex at " + jdbcLock.path);
116+
}
117+
}
118+
104119
private static final class JdbcLock implements Lock {
105120

106121
private final LockRepository mutex;
@@ -264,6 +279,21 @@ public boolean isAcquiredInThisProcess() {
264279
return this.mutex.isAcquired(this.path);
265280
}
266281

282+
public boolean renew() {
283+
if (!this.delegate.isHeldByCurrentThread()) {
284+
throw new IllegalMonitorStateException("You do not own mutex at " + this.path);
285+
}
286+
while (true) {
287+
try {
288+
return this.mutex.renew(this.path);
289+
} catch (TransientDataAccessException e) {
290+
// try again
291+
} catch (Exception e) {
292+
throw new DataAccessResourceFailureException("Failed to renew mutex at " + this.path, e);
293+
}
294+
}
295+
}
296+
267297
}
268298

269299
}

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,8 @@
2424
* has to be declared as a bean.
2525
*
2626
* @author Dave Syer
27+
* @author Alexandre Strubel
28+
*
2729
* @since 4.3
2830
*/
2931
public interface LockRepository extends Closeable {
@@ -34,6 +36,8 @@ public interface LockRepository extends Closeable {
3436

3537
boolean acquire(String lock);
3638

39+
boolean renew(String lock);
40+
3741
@Override
3842
void close();
3943

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.lock;
18+
19+
import org.springframework.integration.support.locks.LockRegistry;
20+
21+
import java.util.concurrent.locks.Lock;
22+
23+
/**
24+
* A {@link LockRegistry} implementing this interface supports the renewal of the time to live of a lock
25+
*
26+
* @author Alexandre Strubel
27+
*
28+
* @since 4.3
29+
*/
30+
public interface RenewableLockRegistry extends LockRegistry {
31+
32+
/**
33+
* Renew the time to live of a 'lock' held by the current thread
34+
* @param lock The lock to renew its time to live
35+
*/
36+
void renewLock(Lock lock);
37+
38+
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -52,6 +52,7 @@
5252
* @author Dave Syer
5353
* @author Artem Bilan
5454
* @author Glenn Renfro
55+
* @author Alexandre Strubel
5556
*
5657
* @since 4.3
5758
*/
@@ -276,4 +277,110 @@ public void testExclusiveAccess() throws Exception {
276277
}
277278
}
278279

280+
@Test
281+
public void testOutOfDateLockTaken() throws Exception {
282+
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
283+
client1.setTimeToLive(500);
284+
client1.afterPropertiesSet();
285+
final DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
286+
client2.afterPropertiesSet();
287+
Lock lock1 = new JdbcLockRegistry(client1).obtain("foo");
288+
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
289+
final CountDownLatch latch1 = new CountDownLatch(1);
290+
final CountDownLatch latch2 = new CountDownLatch(1);
291+
lock1.lockInterruptibly();
292+
Thread.sleep(500);
293+
new SimpleAsyncTaskExecutor()
294+
.execute(() -> {
295+
Lock lock2 = new JdbcLockRegistry(client2).obtain("foo");
296+
try {
297+
latch1.countDown();
298+
StopWatch stopWatch = new StopWatch();
299+
stopWatch.start();
300+
lock2.lockInterruptibly();
301+
stopWatch.stop();
302+
data.add(1);
303+
}
304+
catch (InterruptedException e) {
305+
Thread.currentThread().interrupt();
306+
}
307+
finally {
308+
lock2.unlock();
309+
}
310+
latch2.countDown();
311+
});
312+
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
313+
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
314+
data.add(2);
315+
lock1.unlock();
316+
for (int i = 0; i < 2; i++) {
317+
Integer integer = data.poll(10, TimeUnit.SECONDS);
318+
assertThat(integer).isNotNull();
319+
assertThat(integer.intValue()).isEqualTo(i + 1);
320+
}
321+
}
322+
323+
@Test
324+
public void testRenewLock() throws Exception {
325+
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
326+
client1.setTimeToLive(500);
327+
client1.afterPropertiesSet();
328+
final DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
329+
client2.afterPropertiesSet();
330+
JdbcLockRegistry registry = new JdbcLockRegistry(client1);
331+
Lock lock1 = registry.obtain("foo");
332+
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
333+
final CountDownLatch latch1 = new CountDownLatch(2);
334+
final CountDownLatch latch2 = new CountDownLatch(1);
335+
lock1.lockInterruptibly();
336+
new SimpleAsyncTaskExecutor()
337+
.execute(() -> {
338+
Lock lock2 = new JdbcLockRegistry(client2).obtain("foo");
339+
try {
340+
latch1.countDown();
341+
StopWatch stopWatch = new StopWatch();
342+
stopWatch.start();
343+
lock2.lockInterruptibly();
344+
stopWatch.stop();
345+
data.add(4);
346+
Thread.sleep(10);
347+
data.add(5);
348+
Thread.sleep(10);
349+
data.add(6);
350+
}
351+
catch (InterruptedException e) {
352+
Thread.currentThread().interrupt();
353+
}
354+
finally {
355+
lock2.unlock();
356+
}
357+
});
358+
new SimpleAsyncTaskExecutor()
359+
.execute(() -> {
360+
try {
361+
latch1.countDown();
362+
Thread.sleep(1000);
363+
data.add(1);
364+
Thread.sleep(100);
365+
data.add(2);
366+
Thread.sleep(100);
367+
data.add(3);
368+
latch2.countDown();
369+
}
370+
catch (InterruptedException e) {
371+
Thread.currentThread().interrupt();
372+
}
373+
});
374+
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
375+
while (latch2.getCount() > 0) {
376+
Thread.sleep(100);
377+
registry.renewLock(lock1);
378+
}
379+
lock1.unlock();
380+
for (int i = 0; i < 6; i++) {
381+
Integer integer = data.poll(10, TimeUnit.SECONDS);
382+
assertThat(integer).isNotNull();
383+
assertThat(integer.intValue()).isEqualTo(i + 1);
384+
}
385+
}
279386
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727

2828
import org.junit.Before;
2929
import org.junit.Test;
30+
import org.junit.jupiter.api.Assertions;
3031
import org.junit.runner.RunWith;
3132

3233
import org.springframework.beans.factory.annotation.Autowired;
@@ -40,6 +41,7 @@
4041
/**
4142
* @author Dave Syer
4243
* @author Artem Bilan
44+
* @author Stefan Vassilev
4345
*
4446
* @since 4.3
4547
*/
@@ -266,4 +268,18 @@ public void testTwoThreadsWrongOneUnlocks() throws Exception {
266268
assertThat(((Exception) imse).getMessage()).contains("You do not own");
267269
}
268270

271+
@Test
272+
public void testLockRenew() {
273+
final Lock lock = this.registry.obtain("foo");
274+
275+
assertThat(lock.tryLock()).isTrue();
276+
registry.renewLock(lock);
277+
}
278+
279+
@Test
280+
public void testLockRenewLockNotOwned() {
281+
final Lock lock = this.registry.obtain("foo");
282+
283+
Assertions.assertThrows(IllegalMonitorStateException.class, () -> registry.renewLock(lock));
284+
}
269285
}

0 commit comments

Comments
 (0)