Skip to content

Commit 6c769ca

Browse files
authored
GH-3700: Fix Redis lock retry until expiration
Fixes #3700 Fixed a case where lock acquisition attempt was abandoned earlier than expected time. If a vm uses `tryLock()` and another vm unlock it, it will wake up with an unlock message and try to acquire the lock, but if it fails(the other vm obtain lock), vm will return false.
1 parent 97a5ea4 commit 6c769ca

File tree

2 files changed

+79
-17
lines changed

2 files changed

+79
-17
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -359,31 +359,39 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
359359
}
360360

361361
private boolean subscribeLock(long time) throws ExecutionException, InterruptedException {
362-
if (!obtainLock()) {
363-
if (!RedisLockRegistry.this.redisMessageListenerContainer.isRunning()) {
364-
RedisLockRegistry.this.redisMessageListenerContainer.afterPropertiesSet();
365-
RedisLockRegistry.this.redisMessageListenerContainer.start();
366-
}
362+
final long expiredTime = System.currentTimeMillis() + time;
363+
if (obtainLock()) {
364+
return true;
365+
}
366+
367+
if (!RedisLockRegistry.this.redisMessageListenerContainer.isRunning()) {
368+
RedisLockRegistry.this.redisMessageListenerContainer.afterPropertiesSet();
369+
RedisLockRegistry.this.redisMessageListenerContainer.start();
370+
}
371+
while (time == -1 || expiredTime >= System.currentTimeMillis()) {
367372
try {
368373
Future<String> future =
369374
RedisLockRegistry.this.unlockNotifyMessageListener.subscribeLock(this.lockKey);
370375
//DCL
371-
if (!obtainLock()) {
372-
try {
373-
//if short expireAfter key expire for ttl, no receive unlock msg
374-
long waitTime = time >= 0 ? time : RedisLockRegistry.this.expireAfter;
375-
future.get(waitTime, TimeUnit.MILLISECONDS);
376-
}
377-
catch (TimeoutException ignore) {
378-
}
379-
return obtainLock();
376+
if (obtainLock()) {
377+
return true;
378+
}
379+
try {
380+
//if short expireAfter key expire for ttl, no receive unlock msg
381+
long waitTime = time >= 0 ? time : RedisLockRegistry.this.expireAfter;
382+
future.get(waitTime, TimeUnit.MILLISECONDS);
383+
}
384+
catch (TimeoutException ignore) {
385+
}
386+
if (obtainLock()) {
387+
return true;
380388
}
381389
}
382390
finally {
383391
RedisLockRegistry.this.unlockNotifyMessageListener.unSubscribeLock(this.lockKey);
384392
}
385393
}
386-
return true;
394+
return false;
387395
}
388396

389397
private boolean obtainLock() {

spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -692,6 +692,60 @@ public void multiRedisLockRegistryTest() throws InterruptedException, ExecutionE
692692
}
693693

694694

695+
@Test
696+
@RedisAvailable
697+
public void earlyWakeUpTest() throws InterruptedException {
698+
final int THREAD_CNT = 2;
699+
final String testKey = "testKey";
700+
701+
final CountDownLatch tryLockReady = new CountDownLatch(THREAD_CNT);
702+
final CountDownLatch awaitTimeout = new CountDownLatch(THREAD_CNT);
703+
final RedisConnectionFactory connectionFactory = getConnectionFactoryForTest();
704+
final RedisLockRegistry registry1 = new RedisLockRegistry(connectionFactory, this.registryKey);
705+
final RedisLockRegistry registry2 = new RedisLockRegistry(connectionFactory, this.registryKey);
706+
final RedisLockRegistry registry3 = new RedisLockRegistry(connectionFactory, this.registryKey);
707+
final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_CNT);
708+
709+
Lock lock1 = registry1.obtain(testKey);
710+
Lock lock2 = registry2.obtain(testKey);
711+
Lock lock3 = registry3.obtain(testKey);
712+
AtomicInteger expectOne = new AtomicInteger();
713+
714+
lock1.lock();
715+
executorService.submit(() -> {
716+
try {
717+
tryLockReady.countDown();
718+
boolean b = lock2.tryLock(10, TimeUnit.SECONDS);
719+
awaitTimeout.countDown();
720+
if (b) {
721+
expectOne.incrementAndGet();
722+
}
723+
}
724+
catch (InterruptedException ignore) {
725+
}
726+
});
727+
728+
executorService.submit(() -> {
729+
try {
730+
tryLockReady.countDown();
731+
boolean b = lock3.tryLock(10, TimeUnit.SECONDS);
732+
awaitTimeout.countDown();
733+
if (b) {
734+
expectOne.incrementAndGet();
735+
}
736+
}
737+
catch (InterruptedException ignore) {
738+
}
739+
});
740+
741+
assertThat(tryLockReady.await(10, TimeUnit.SECONDS)).isTrue();
742+
lock1.unlock();
743+
assertThat(awaitTimeout.await(1, TimeUnit.SECONDS)).isFalse();
744+
assertThat(expectOne.get()).isEqualTo(1);
745+
executorService.shutdown();
746+
}
747+
748+
695749
@SuppressWarnings({ "unchecked", "rawtypes" })
696750
@Test
697751
public void testUlink() {

0 commit comments

Comments
 (0)