Skip to content

Commit 4d08e11

Browse files
NaccOllartembilan
authored andcommitted
Add Redis lock periodic renewal
Although `RenewableLockRegistry` provides a renew interface, it is inconvenient for users. Developers hope to have a lock that can be automatically renewed. On the one hand, it can avoid subsequent failures caused by locks that will not expire when abnormal exits, and on the other hand, it can avoid unlock failures caused by lock expired. * Add `RenewableLockRegistry.setRenewalTaskScheduler()` and when it is set, schedule a `renew()` script periodically when lock is acquired from Redis with `1/3` of `expireAfter` * Test and document the feature
1 parent 1de81b6 commit 4d08e11

File tree

5 files changed

+142
-6
lines changed

5 files changed

+142
-6
lines changed

spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,12 +16,15 @@
1616

1717
package org.springframework.integration.support.locks;
1818

19+
import org.springframework.scheduling.TaskScheduler;
20+
1921
/**
2022
* A {@link LockRegistry} implementing this interface supports the renewal
2123
* of the time to live of a lock.
2224
*
2325
* @author Alexandre Strubel
2426
* @author Artem Bilan
27+
* @author Youbin Wu
2528
*
2629
* @since 5.4
2730
*/
@@ -34,4 +37,14 @@ public interface RenewableLockRegistry extends LockRegistry {
3437
*/
3538
void renewLock(Object lockKey);
3639

40+
/**
41+
* Set the {@link TaskScheduler} to use for the renewal task.
42+
* When renewalTaskScheduler is set, it will be used to periodically renew the lock to ensure that
43+
* the lock does not expire while the thread is working.
44+
* @param renewalTaskScheduler renew task scheduler
45+
* @since 6.4
46+
*/
47+
default void setRenewalTaskScheduler(TaskScheduler renewalTaskScheduler) {
48+
}
49+
3750
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

+79-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.redis.util;
1818

19+
import java.io.Serial;
1920
import java.text.SimpleDateFormat;
2021
import java.time.Duration;
2122
import java.util.Collections;
@@ -32,6 +33,7 @@
3233
import java.util.concurrent.ExecutorService;
3334
import java.util.concurrent.Executors;
3435
import java.util.concurrent.Future;
36+
import java.util.concurrent.ScheduledFuture;
3537
import java.util.concurrent.TimeUnit;
3638
import java.util.concurrent.TimeoutException;
3739
import java.util.concurrent.locks.Condition;
@@ -54,6 +56,8 @@
5456
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
5557
import org.springframework.data.redis.listener.Topic;
5658
import org.springframework.integration.support.locks.ExpirableLockRegistry;
59+
import org.springframework.integration.support.locks.RenewableLockRegistry;
60+
import org.springframework.scheduling.TaskScheduler;
5761
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
5862
import org.springframework.util.Assert;
5963
import org.springframework.util.ReflectionUtils;
@@ -89,11 +93,12 @@
8993
* @author Myeonghyeon Lee
9094
* @author Roman Zabaluev
9195
* @author Alex Peelman
96+
* @author Youbin Wu
9297
*
9398
* @since 4.0
9499
*
95100
*/
96-
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {
101+
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean, RenewableLockRegistry {
97102

98103
private static final Log LOGGER = LogFactory.getLog(RedisLockRegistry.class);
99104

@@ -110,6 +115,9 @@ public final class RedisLockRegistry implements ExpirableLockRegistry, Disposabl
110115
private final Map<String, RedisLock> locks =
111116
new LinkedHashMap<>(16, 0.75F, true) {
112117

118+
@Serial
119+
private static final long serialVersionUID = 7419938441348450459L;
120+
113121
@Override
114122
protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
115123
return size() > RedisLockRegistry.this.cacheCapacity;
@@ -138,6 +146,8 @@ protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
138146
private Executor executor =
139147
Executors.newCachedThreadPool(new CustomizableThreadFactory("redis-lock-registry-"));
140148

149+
private TaskScheduler renewalTaskScheduler;
150+
141151
/**
142152
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
143153
* thus should not be shutdown when {@link #destroy()} is called
@@ -207,6 +217,12 @@ public void setExecutor(Executor executor) {
207217
this.executorExplicitlySet = true;
208218
}
209219

220+
@Override
221+
public void setRenewalTaskScheduler(TaskScheduler renewalTaskScheduler) {
222+
Assert.notNull(renewalTaskScheduler, "'renewalTaskScheduler' must not be null");
223+
this.renewalTaskScheduler = renewalTaskScheduler;
224+
}
225+
210226
/**
211227
* Set the capacity of cached locks.
212228
* @param cacheCapacity The capacity of cached lock, (default 100_000).
@@ -291,6 +307,26 @@ public void destroy() {
291307
}
292308
}
293309

310+
@Override
311+
public void renewLock(Object lockKey) {
312+
String path = (String) lockKey;
313+
RedisLock redisLock;
314+
this.lock.lock();
315+
try {
316+
redisLock = this.locks.computeIfAbsent(path, getRedisLockConstructor(this.redisLockType));
317+
}
318+
finally {
319+
this.lock.unlock();
320+
}
321+
if (redisLock == null) {
322+
throw new IllegalStateException("Could not renew mutex at " + path);
323+
}
324+
325+
if (!redisLock.renew()) {
326+
throw new IllegalStateException("Could not renew mutex at " + path);
327+
}
328+
}
329+
294330
/**
295331
* The mode in which this registry is going to work with locks.
296332
*/
@@ -328,15 +364,28 @@ private abstract class RedisLock implements Lock {
328364
return false
329365
""";
330366

331-
protected static final RedisScript<Boolean>
332-
OBTAIN_LOCK_REDIS_SCRIPT = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
367+
private static final String RENEW_SCRIPT = """
368+
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
369+
redis.call('PEXPIRE', KEYS[1], ARGV[2])
370+
return true
371+
end
372+
return false
373+
""";
374+
375+
protected static final RedisScript<Boolean> OBTAIN_LOCK_REDIS_SCRIPT =
376+
new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
377+
378+
public static final RedisScript<Boolean> RENEW_REDIS_SCRIPT =
379+
new DefaultRedisScript<>(RENEW_SCRIPT, Boolean.class);
333380

334381
protected final String lockKey;
335382

336383
private final ReentrantLock localLock = new ReentrantLock();
337384

338385
private volatile long lockedAt;
339386

387+
private volatile ScheduledFuture<?> renewFuture;
388+
340389
private RedisLock(String path) {
341390
this.lockKey = constructLockKey(path);
342391
}
@@ -454,6 +503,11 @@ private boolean tryRedisLock(long time) throws ExecutionException, InterruptedEx
454503
LOGGER.debug("Acquired lock; " + this);
455504
}
456505
this.lockedAt = System.currentTimeMillis();
506+
if (RedisLockRegistry.this.renewalTaskScheduler != null) {
507+
Duration delay = Duration.ofMillis(RedisLockRegistry.this.expireAfter / 3);
508+
this.renewFuture =
509+
RedisLockRegistry.this.renewalTaskScheduler.scheduleWithFixedDelay(this::renew, delay);
510+
}
457511
}
458512
return acquired;
459513
}
@@ -515,6 +569,7 @@ private void removeLockKey() {
515569

516570
if (Boolean.TRUE.equals(unlinkResult)) {
517571
// Lock key successfully unlinked
572+
stopRenew();
518573
return;
519574
}
520575
else if (Boolean.FALSE.equals(unlinkResult)) {
@@ -526,6 +581,26 @@ else if (Boolean.FALSE.equals(unlinkResult)) {
526581
throw new ConcurrentModificationException("Lock was released in the store due to expiration. " +
527582
"The integrity of data protected by this lock may have been compromised.");
528583
}
584+
else {
585+
stopRenew();
586+
}
587+
}
588+
589+
protected final boolean renew() {
590+
boolean res = Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
591+
RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey),
592+
RedisLockRegistry.this.clientId, String.valueOf(RedisLockRegistry.this.expireAfter)));
593+
if (!res) {
594+
stopRenew();
595+
}
596+
return res;
597+
}
598+
599+
protected final void stopRenew() {
600+
if (this.renewFuture != null) {
601+
this.renewFuture.cancel(true);
602+
this.renewFuture = null;
603+
}
529604
}
530605

531606
@Override
@@ -553,7 +628,7 @@ public int hashCode() {
553628
int result = 1;
554629
result = prime * result + getOuterType().hashCode();
555630
result = prime * result + ((this.lockKey == null) ? 0 : this.lockKey.hashCode());
556-
result = prime * result + (int) (this.lockedAt ^ (this.lockedAt >>> 32)); // NOSONAR magic number
631+
result = prime * result + Long.hashCode(this.lockedAt);
557632
result = prime * result + RedisLockRegistry.this.clientId.hashCode();
558633
return result;
559634
}

spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java

+44
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@
5151
import org.springframework.integration.redis.RedisContainerTest;
5252
import org.springframework.integration.redis.util.RedisLockRegistry.RedisLockType;
5353
import org.springframework.integration.test.util.TestUtils;
54+
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
5455

5556
import static org.assertj.core.api.Assertions.assertThat;
57+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
5658
import static org.assertj.core.api.Assertions.assertThatNoException;
5759
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5860
import static org.mockito.Mockito.mock;
@@ -66,6 +68,7 @@
6668
* @author Artem Vozhdayenko
6769
* @author Anton Gabov
6870
* @author Eddie Cho
71+
* @author Youbin Wu
6972
*
7073
* @since 4.0
7174
*
@@ -427,6 +430,20 @@ void testExceptionOnExpire(RedisLockType testRedisLockType) throws Exception {
427430
registry.destroy();
428431
}
429432

433+
@ParameterizedTest
434+
@EnumSource(RedisLockType.class)
435+
void testRenewalOnExpire(RedisLockType redisLockType) throws Exception {
436+
long expireAfter = 300L;
437+
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, expireAfter);
438+
registry.setRenewalTaskScheduler(new SimpleAsyncTaskScheduler());
439+
registry.setRedisLockType(redisLockType);
440+
Lock lock1 = registry.obtain("foo");
441+
assertThat(lock1.tryLock()).isTrue();
442+
Thread.sleep(expireAfter * 2);
443+
lock1.unlock();
444+
registry.destroy();
445+
}
446+
430447
@ParameterizedTest
431448
@EnumSource(RedisLockType.class)
432449
void testEquals(RedisLockType testRedisLockType) {
@@ -900,6 +917,33 @@ void testTwoThreadsRemoveAndObtainSameLockSimultaneously(RedisLockType testRedis
900917
registry.destroy();
901918
}
902919

920+
@ParameterizedTest
921+
@EnumSource(RedisLockType.class)
922+
void testLockRenew(RedisLockType redisLockType) {
923+
final RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey);
924+
registry.setRedisLockType(redisLockType);
925+
final Lock lock = registry.obtain("foo");
926+
927+
assertThat(lock.tryLock()).isTrue();
928+
try {
929+
registry.renewLock("foo");
930+
}
931+
finally {
932+
lock.unlock();
933+
}
934+
}
935+
936+
@ParameterizedTest
937+
@EnumSource(RedisLockType.class)
938+
void testLockRenewLockNotOwned(RedisLockType redisLockType) {
939+
final RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey);
940+
registry.setRedisLockType(redisLockType);
941+
registry.obtain("foo");
942+
943+
assertThatExceptionOfType(IllegalStateException.class)
944+
.isThrownBy(() -> registry.renewLock("foo"));
945+
}
946+
903947
@Test
904948
void testInitialiseWithCustomExecutor() {
905949
RedisLockRegistry redisLockRegistry = new RedisLockRegistry(redisConnectionFactory, "registryKey");

src/reference/antora/modules/ROOT/pages/redis.adoc

+4-1
Original file line numberDiff line numberDiff line change
@@ -856,4 +856,7 @@ Default.
856856
The pub-sub is preferred mode - less network chatter between client Redis server, and more performant - the lock is acquired immediately when subscription is notified about unlocking in the other process.
857857
However, the Redis does not support pub-sub in the Master/Replica connections (for example in AWS ElastiCache environment), therefore a busy-spin mode is chosen as a default to make the registry working in any environment.
858858

859-
Starting with version 6.4, instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
859+
Starting with version 6.4, instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
860+
861+
Starting with version 6.4, a `RedisLockRegistry.setRenewalTaskScheduler()` is added to configure the scheduler for periodic renewal of locks.
862+
When it is set, the lock will be automatically renewed every `1/3` of the expiration time after the lock is successfully acquired, until unlocked or the redis key is removed.

src/reference/antora/modules/ROOT/pages/whats-new.adoc

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ See xref:zeromq.adoc[ZeroMQ Support] for more information.
5959
=== Redis Changes
6060

6161
Instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
62+
Add a `RedisLockRegistry.setRenewalTaskScheduler()` to periodic lock renewal.
6263
See xref:redis.adoc[Redis Support] for more information.
6364

6465
[[x6.4-groovy-changes]]

0 commit comments

Comments
 (0)