Skip to content

Commit 9cb8e73

Browse files
committed
Fix Pub/Sub Cluster connection re-activation #2534
We now eagerly obtain the cluster node identifier, before resubscribing to prevent command failures.
1 parent 58657be commit 9cb8e73

File tree

3 files changed

+50
-15
lines changed

3 files changed

+50
-15
lines changed

src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,12 @@ public CompletableFuture<StatefulRedisPubSubConnection<K, V>> getConnectionAsync
171171

172172
@Override
173173
public void activated() {
174-
super.activated();
175174

176-
async.clusterMyId().thenAccept(this::setNodeId);
175+
if (!endpoint.isSubscribed()) {
176+
async.clusterMyId().thenAccept(this::setNodeId);
177+
}
178+
179+
super.activated();
177180
}
178181

179182
public void setPartitions(Partitions partitions) {

src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
*/
1616
package io.lettuce.core.pubsub;
1717

18-
import java.util.*;
18+
import java.util.Arrays;
19+
import java.util.Collection;
20+
import java.util.HashSet;
21+
import java.util.LinkedHashSet;
22+
import java.util.List;
23+
import java.util.Set;
1924
import java.util.concurrent.ConcurrentHashMap;
2025
import java.util.concurrent.CopyOnWriteArrayList;
2126

@@ -192,7 +197,7 @@ private static boolean isAllowed(RedisCommand<?, ?, ?> command) {
192197
return ALLOWED_COMMANDS_SUBSCRIBED.contains(command.getType().name());
193198
}
194199

195-
private boolean isSubscribed() {
200+
public boolean isSubscribed() {
196201
return subscribeWritten && (hasChannelSubscriptions() || hasPatternSubscriptions());
197202
}
198203

src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,10 @@
1515
*/
1616
package io.lettuce.core.cluster.pubsub;
1717

18-
import static org.assertj.core.api.Assertions.assertThat;
18+
import static org.assertj.core.api.Assertions.*;
1919

20-
import java.time.Duration;
2120
import java.util.List;
22-
import java.util.Map;
2321
import java.util.concurrent.BlockingQueue;
24-
import java.util.concurrent.ConcurrentHashMap;
25-
import java.util.concurrent.CopyOnWriteArrayList;
2622
import java.util.concurrent.LinkedBlockingQueue;
2723

2824
import javax.inject.Inject;
@@ -34,7 +30,6 @@
3430

3531
import io.lettuce.core.RedisURI;
3632
import io.lettuce.core.TestSupport;
37-
import io.lettuce.core.api.push.PushMessage;
3833
import io.lettuce.core.api.sync.RedisCommands;
3934
import io.lettuce.core.cluster.RedisClusterClient;
4035
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
@@ -45,12 +40,13 @@
4540
import io.lettuce.core.cluster.pubsub.api.reactive.PubSubReactiveNodeSelection;
4641
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
4742
import io.lettuce.core.cluster.pubsub.api.sync.PubSubNodeSelection;
43+
import io.lettuce.core.event.command.CommandFailedEvent;
44+
import io.lettuce.core.event.command.CommandListener;
4845
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
4946
import io.lettuce.core.support.PubSubTestListener;
5047
import io.lettuce.test.LettuceExtension;
5148
import io.lettuce.test.TestFutures;
5249
import io.lettuce.test.Wait;
53-
import io.lettuce.test.condition.EnabledOnCommand;
5450

5551
/**
5652
* @author Mark Paluch
@@ -61,10 +57,13 @@ class RedisClusterPubSubConnectionIntegrationTests extends TestSupport {
6157
private final RedisClusterClient clusterClient;
6258

6359
private final PubSubTestListener connectionListener = new PubSubTestListener();
60+
6461
private final PubSubTestListener nodeListener = new PubSubTestListener();
6562

6663
private StatefulRedisClusterConnection<String, String> connection;
64+
6765
private StatefulRedisClusterPubSubConnection<String, String> pubSubConnection;
66+
6867
private StatefulRedisClusterPubSubConnection<String, String> pubSubConnection2;
6968

7069
@Inject
@@ -101,6 +100,34 @@ void testRegularClientPubSubChannels() {
101100
assertThat(channelsOnOtherNode).isEmpty();
102101
}
103102

103+
@Test
104+
void myIdWorksAfterDisconnect() throws InterruptedException {
105+
106+
BlockingQueue<CommandFailedEvent> failedEvents = new LinkedBlockingQueue<CommandFailedEvent>();
107+
108+
CommandListener listener = new CommandListener() {
109+
110+
@Override
111+
public void commandFailed(CommandFailedEvent event) {
112+
failedEvents.add(event);
113+
}
114+
115+
};
116+
clusterClient.addListener(listener);
117+
118+
StatefulRedisClusterPubSubConnection<String, String> pubsub = clusterClient.connectPubSub();
119+
pubsub.sync().subscribe("foo");
120+
pubsub.async().quit();
121+
122+
Thread.sleep(100);
123+
Wait.untilTrue(pubsub::isOpen).waitOrTimeout();
124+
125+
pubsub.close();
126+
clusterClient.removeListener(listener);
127+
128+
assertThat(failedEvents).isEmpty();
129+
}
130+
104131
@Test
105132
void testRegularClientPublish() throws Exception {
106133

@@ -164,8 +191,7 @@ void testGetConnectionAsyncByNodeId() {
164191
RedisClusterNode partition = pubSubConnection.getPartitions().getPartition(0);
165192

166193
StatefulRedisPubSubConnection<String, String> node = TestFutures
167-
.getOrTimeout(pubSubConnection.getConnectionAsync(partition
168-
.getNodeId()));
194+
.getOrTimeout(pubSubConnection.getConnectionAsync(partition.getNodeId()));
169195

170196
assertThat(node.sync().ping()).isEqualTo("PONG");
171197
}
@@ -177,8 +203,7 @@ void testGetConnectionAsyncByHostAndPort() {
177203

178204
RedisURI uri = partition.getUri();
179205
StatefulRedisPubSubConnection<String, String> node = TestFutures
180-
.getOrTimeout(pubSubConnection.getConnectionAsync(uri.getHost(),
181-
uri.getPort()));
206+
.getOrTimeout(pubSubConnection.getConnectionAsync(uri.getHost(), uri.getPort()));
182207

183208
assertThat(node.sync().ping()).isEqualTo("PONG");
184209
}
@@ -297,6 +322,7 @@ void testClusterListener() throws Exception {
297322
public void message(RedisClusterNode node, String pattern, String channel, String message) {
298323
nodes.add(node);
299324
}
325+
300326
});
301327

302328
PubSubNodeSelection<String, String> masters = pubSubConnection.sync().masters();
@@ -326,4 +352,5 @@ private RedisClusterNode getOtherThan(String nodeId) {
326352

327353
throw new IllegalStateException("No other nodes than " + nodeId + " available");
328354
}
355+
329356
}

0 commit comments

Comments
 (0)