diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java index 63b111528b..5629bac98f 100644 --- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java @@ -40,6 +40,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; @@ -55,6 +56,8 @@ import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.Subscription; import org.springframework.data.redis.connection.SubscriptionListener; +import org.springframework.data.redis.connection.jedis.JedisClusterConnection; +import org.springframework.data.redis.connection.jedis.JedisConnection; import org.springframework.data.redis.connection.util.ByteArrayWrapper; import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException; import org.springframework.data.redis.serializer.RedisSerializer; @@ -69,6 +72,9 @@ import org.springframework.util.backoff.BackOffExecution; import org.springframework.util.backoff.FixedBackOff; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; + /** * Container providing asynchronous behaviour for Redis message listeners. Handles the low level details of listening, * converting and message dispatching. @@ -101,6 +107,7 @@ * @author Thomas Darimont * @author Mark Paluch * @author John Blum + * @author Kenty Chen * @see MessageListener * @see SubscriptionListener */ @@ -1502,17 +1509,49 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> subscriptionDone.complete(null))); - this.executor.execute(() -> { - try { - doSubscribe(connection, patterns, initiallySubscribeToChannels); - closeConnection(); - unsubscribeFuture.complete(null); - } catch (Throwable cause) { - handleSubscriptionException(subscriptionDone, - nextBackoffExecution(backOffExecution, connection.isSubscribed()), cause); - } - }); + // for keyspace notification subscribtion start + if (connection instanceof JedisClusterConnection && isKeySpaceNotification(patterns)) { + DirectFieldAccessor dfa = new DirectFieldAccessor(connection); + JedisCluster cluster = (JedisCluster) dfa.getPropertyValue("cluster"); + cluster.getClusterNodes().forEach((key, pool) -> { + this.executor.execute(() -> { + try (Jedis jedis = new Jedis(pool.borrowObject())) { + String info = jedis.info("replication"); + if (info.contains("role:master")) { + logger.info("Start to psubscribe with: " + key); + doSubscribe(new JedisConnection(jedis), patterns, initiallySubscribeToChannels); + closeConnection(); + unsubscribeFuture.complete(null); + } + } catch (Throwable cause) { + handleSubscriptionException(subscriptionDone, + nextBackoffExecution(backOffExecution, connection.isSubscribed()), cause); + } + }); + }); + } + // for keyspace notification subscribtion end + else { + this.executor.execute(() -> { + + try { + doSubscribe(connection, patterns, initiallySubscribeToChannels); + closeConnection(); + unsubscribeFuture.complete(null); + } catch (Throwable cause) { + handleSubscriptionException(subscriptionDone, + nextBackoffExecution(backOffExecution, connection.isSubscribed()), cause); + } + }); + } + } + + private boolean isKeySpaceNotification(Collection patterns) { + if (patterns.size() != 1) + return false; + byte[] pattern = patterns.stream().findFirst().orElse(null); + return pattern != null && new String(pattern).contains("__keyspace@"); } } }