Skip to content

Commit e2c6e77

Browse files
Alexandre Strubelartembilan
Alexandre Strubel
authored andcommitted
GH-3272: Add lease renewal for distributed locks
Fixes #3272 * Introduce a `RenewableLockRegistry` since not all `LockRegistry` implementations provide a way to renew the lease for the lock * Implement `RenewableLockRegistry` in the `JdbcLockRegistry` * Test and document the feature
1 parent 381a071 commit e2c6e77

File tree

8 files changed

+250
-30
lines changed

8 files changed

+250
-30
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.locks;
18+
19+
/**
20+
* A {@link LockRegistry} implementing this interface supports the renewal
21+
* of the time to live of a lock.
22+
*
23+
* @author Alexandre Strubel
24+
* @author Artem Bilan
25+
*
26+
* @since 5.4
27+
*/
28+
public interface RenewableLockRegistry extends LockRegistry {
29+
30+
/**
31+
* Renew the time to live of the lock is associated with the parameter object.
32+
* The lock must be held by the current thread
33+
* @param lockKey The object with which the lock is associated.
34+
*/
35+
void renewLock(Object lockKey);
36+
37+
}

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,6 +43,7 @@
4343
* @author Artem Bilan
4444
* @author Glenn Renfro
4545
* @author Gary Russell
46+
* @author Alexandre Strubel
4647
*
4748
* @since 4.3
4849
*/
@@ -80,7 +81,8 @@ public class DefaultLockRepository implements LockRepository, InitializingBean {
8081

8182
private String insertQuery = "INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE) VALUES (?, ?, ?, ?)";
8283

83-
private String countQuery = "SELECT COUNT(REGION) FROM %sLOCK WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? AND CREATED_DATE>=?";
84+
private String countQuery =
85+
"SELECT COUNT(REGION) FROM %sLOCK WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? AND CREATED_DATE>=?";
8486

8587
/**
8688
* Constructor that initializes the client id that will be associated for
@@ -108,7 +110,7 @@ public DefaultLockRepository(DataSource dataSource, String id) {
108110
/**
109111
* A unique grouping identifier for all locks persisted with this store. Using
110112
* multiple regions allows the store to be partitioned (if necessary) for different
111-
* purposes. Defaults to <code>DEFAULT</code>.
113+
* purposes. Defaults to {@code DEFAULT}.
112114
* @param region the region name to set
113115
*/
114116
public void setRegion(String region) {
@@ -179,4 +181,9 @@ private void deleteExpired(String lock) {
179181
new Date(System.currentTimeMillis() - this.ttl));
180182
}
181183

184+
@Override
185+
public boolean renew(String lock) {
186+
return this.template.update(this.updateQuery, new Date(), this.region, lock, this.id) > 0;
187+
}
188+
182189
}

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.dao.DataAccessResourceFailureException;
3131
import org.springframework.dao.TransientDataAccessException;
3232
import org.springframework.integration.support.locks.ExpirableLockRegistry;
33+
import org.springframework.integration.support.locks.RenewableLockRegistry;
3334
import org.springframework.integration.util.UUIDConverter;
3435
import org.springframework.transaction.TransactionTimedOutException;
3536
import org.springframework.util.Assert;
@@ -49,10 +50,11 @@
4950
* @author Bartosz Rempuszewski
5051
* @author Gary Russell
5152
* @author Alexandre Strubel
53+
* @author Stefan Vassilev
5254
*
5355
* @since 4.3
5456
*/
55-
public class JdbcLockRegistry implements ExpirableLockRegistry {
57+
public class JdbcLockRegistry implements ExpirableLockRegistry, RenewableLockRegistry {
5658

5759
private static final int DEFAULT_IDLE = 100;
5860

@@ -101,6 +103,19 @@ public void expireUnusedOlderThan(long age) {
101103
}
102104
}
103105

106+
@Override
107+
public void renewLock(Object lockKey) {
108+
Assert.isInstanceOf(String.class, lockKey);
109+
String path = pathFor((String) lockKey);
110+
JdbcLock jdbcLock = this.locks.get(path);
111+
if (jdbcLock == null) {
112+
throw new IllegalStateException("Could not found mutex at " + path);
113+
}
114+
if (!jdbcLock.renew()) {
115+
throw new IllegalStateException("Could not renew mutex at " + path);
116+
}
117+
}
118+
104119
private static final class JdbcLock implements Lock {
105120

106121
private final LockRepository mutex;
@@ -232,7 +247,7 @@ private boolean doLock() {
232247
@Override
233248
public void unlock() {
234249
if (!this.delegate.isHeldByCurrentThread()) {
235-
throw new IllegalMonitorStateException("You do not own mutex at " + this.path);
250+
throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.path);
236251
}
237252
if (this.delegate.getHoldCount() > 1) {
238253
this.delegate.unlock();
@@ -243,7 +258,7 @@ public void unlock() {
243258
this.mutex.delete(this.path);
244259
return;
245260
}
246-
catch (TransientDataAccessException e) {
261+
catch (TransientDataAccessException | TransactionTimedOutException e) {
247262
// try again
248263
}
249264
catch (Exception e) {
@@ -264,6 +279,27 @@ public boolean isAcquiredInThisProcess() {
264279
return this.mutex.isAcquired(this.path);
265280
}
266281

282+
public boolean renew() {
283+
if (!this.delegate.isHeldByCurrentThread()) {
284+
throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.path);
285+
}
286+
while (true) {
287+
try {
288+
boolean renewed = this.mutex.renew(this.path);
289+
if (renewed) {
290+
this.lastUsed = System.currentTimeMillis();
291+
}
292+
return renewed;
293+
}
294+
catch (TransientDataAccessException | TransactionTimedOutException e) {
295+
// try again
296+
}
297+
catch (Exception e) {
298+
throw new DataAccessResourceFailureException("Failed to renew mutex at " + this.path, e);
299+
}
300+
}
301+
}
302+
267303
}
268304

269305
}

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,8 @@
2424
* has to be declared as a bean.
2525
*
2626
* @author Dave Syer
27+
* @author Alexandre Strubel
28+
*
2729
* @since 4.3
2830
*/
2931
public interface LockRepository extends Closeable {
@@ -34,6 +36,8 @@ public interface LockRepository extends Closeable {
3436

3537
boolean acquire(String lock);
3638

39+
boolean renew(String lock);
40+
3741
@Override
3842
void close();
3943

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java

Lines changed: 116 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,29 +34,27 @@
3434

3535
import org.apache.commons.logging.Log;
3636
import org.apache.commons.logging.LogFactory;
37-
import org.junit.After;
38-
import org.junit.Before;
39-
import org.junit.Test;
40-
import org.junit.runner.RunWith;
37+
import org.junit.jupiter.api.AfterEach;
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.Test;
4140

4241
import org.springframework.beans.factory.annotation.Autowired;
4342
import org.springframework.context.ConfigurableApplicationContext;
4443
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
4544
import org.springframework.core.task.SimpleAsyncTaskExecutor;
4645
import org.springframework.test.annotation.DirtiesContext;
47-
import org.springframework.test.context.ContextConfiguration;
48-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
46+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4947
import org.springframework.util.StopWatch;
5048

5149
/**
5250
* @author Dave Syer
5351
* @author Artem Bilan
5452
* @author Glenn Renfro
53+
* @author Alexandre Strubel
5554
*
5655
* @since 4.3
5756
*/
58-
@ContextConfiguration("JdbcLockRegistryTests-context.xml")
59-
@RunWith(SpringJUnit4ClassRunner.class)
57+
@SpringJUnitConfig(locations = "JdbcLockRegistryTests-context.xml")
6058
@DirtiesContext
6159
public class JdbcLockRegistryDifferentClientTests {
6260

@@ -76,7 +74,7 @@ public class JdbcLockRegistryDifferentClientTests {
7674
@Autowired
7775
private DataSource dataSource;
7876

79-
@Before
77+
@BeforeEach
8078
public void clear() {
8179
this.registry.expireUnusedOlderThan(0);
8280
this.client.close();
@@ -86,7 +84,7 @@ public void clear() {
8684
this.child.refresh();
8785
}
8886

89-
@After
87+
@AfterEach
9088
public void close() {
9189
if (this.child != null) {
9290
this.child.close();
@@ -276,4 +274,111 @@ public void testExclusiveAccess() throws Exception {
276274
}
277275
}
278276

277+
@Test
278+
public void testOutOfDateLockTaken() throws Exception {
279+
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
280+
client1.setTimeToLive(500);
281+
client1.afterPropertiesSet();
282+
final DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
283+
client2.afterPropertiesSet();
284+
Lock lock1 = new JdbcLockRegistry(client1).obtain("foo");
285+
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
286+
final CountDownLatch latch1 = new CountDownLatch(1);
287+
final CountDownLatch latch2 = new CountDownLatch(1);
288+
lock1.lockInterruptibly();
289+
Thread.sleep(500);
290+
new SimpleAsyncTaskExecutor()
291+
.execute(() -> {
292+
Lock lock2 = new JdbcLockRegistry(client2).obtain("foo");
293+
try {
294+
latch1.countDown();
295+
StopWatch stopWatch = new StopWatch();
296+
stopWatch.start();
297+
lock2.lockInterruptibly();
298+
stopWatch.stop();
299+
data.add(1);
300+
}
301+
catch (InterruptedException e) {
302+
Thread.currentThread().interrupt();
303+
}
304+
finally {
305+
lock2.unlock();
306+
}
307+
latch2.countDown();
308+
});
309+
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
310+
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
311+
data.add(2);
312+
lock1.unlock();
313+
for (int i = 0; i < 2; i++) {
314+
Integer integer = data.poll(10, TimeUnit.SECONDS);
315+
assertThat(integer).isNotNull();
316+
assertThat(integer.intValue()).isEqualTo(i + 1);
317+
}
318+
}
319+
320+
@Test
321+
public void testRenewLock() throws Exception {
322+
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
323+
client1.setTimeToLive(500);
324+
client1.afterPropertiesSet();
325+
final DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
326+
client2.afterPropertiesSet();
327+
JdbcLockRegistry registry = new JdbcLockRegistry(client1);
328+
Lock lock1 = registry.obtain("foo");
329+
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
330+
final CountDownLatch latch1 = new CountDownLatch(2);
331+
final CountDownLatch latch2 = new CountDownLatch(1);
332+
lock1.lockInterruptibly();
333+
new SimpleAsyncTaskExecutor()
334+
.execute(() -> {
335+
Lock lock2 = new JdbcLockRegistry(client2).obtain("foo");
336+
try {
337+
latch1.countDown();
338+
StopWatch stopWatch = new StopWatch();
339+
stopWatch.start();
340+
lock2.lockInterruptibly();
341+
stopWatch.stop();
342+
data.add(4);
343+
Thread.sleep(10);
344+
data.add(5);
345+
Thread.sleep(10);
346+
data.add(6);
347+
}
348+
catch (InterruptedException e) {
349+
Thread.currentThread().interrupt();
350+
}
351+
finally {
352+
lock2.unlock();
353+
}
354+
});
355+
new SimpleAsyncTaskExecutor()
356+
.execute(() -> {
357+
try {
358+
latch1.countDown();
359+
Thread.sleep(1000);
360+
data.add(1);
361+
Thread.sleep(100);
362+
data.add(2);
363+
Thread.sleep(100);
364+
data.add(3);
365+
latch2.countDown();
366+
}
367+
catch (InterruptedException e) {
368+
Thread.currentThread().interrupt();
369+
}
370+
});
371+
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
372+
while (latch2.getCount() > 0) {
373+
Thread.sleep(100);
374+
registry.renewLock("foo");
375+
}
376+
lock1.unlock();
377+
for (int i = 0; i < 6; i++) {
378+
Integer integer = data.poll(10, TimeUnit.SECONDS);
379+
assertThat(integer).isNotNull();
380+
assertThat(integer.intValue()).isEqualTo(i + 1);
381+
}
382+
}
383+
279384
}

0 commit comments

Comments
 (0)