diff --git a/pom.xml b/pom.xml index a67dc48660..a0f8c59438 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 3.2.0-SNAPSHOT + 3.2.0-GH-2656-SNAPSHOT Spring Data Redis Spring Data module for Redis diff --git a/src/main/java/org/springframework/data/redis/connection/Message.java b/src/main/java/org/springframework/data/redis/connection/Message.java index d33ce3a7fd..4e545dba2b 100644 --- a/src/main/java/org/springframework/data/redis/connection/Message.java +++ b/src/main/java/org/springframework/data/redis/connection/Message.java @@ -17,8 +17,6 @@ import java.io.Serializable; -import org.springframework.lang.Nullable; - /** * Class encapsulating a Redis message body and its properties. * diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java index b0f2042e00..2b6fe773a9 100644 --- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java @@ -671,7 +671,7 @@ else if (topic instanceof PatternTopic) { if (wasListening) { CompletableFuture future = new CompletableFuture<>(); - getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, + getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> future.complete(null))); getRequiredSubscriber().subscribeChannel(channels.toArray(new byte[channels.size()][])); getRequiredSubscriber().subscribePattern(patterns.toArray(new byte[patterns.size()][])); @@ -1212,7 +1212,7 @@ public CompletableFuture initialize(BackOffExecution backOffExecution, Col void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution, CompletableFuture subscriptionDone, Collection patterns, Collection channels) { - addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels, + addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> subscriptionDone.complete(null))); doSubscribe(connection, patterns, channels); @@ -1240,7 +1240,7 @@ void doSubscribe(RedisConnection connection, Collection patterns, Collec } } - void addSynchronization(SynchronizingMessageListener.SubscriptionSynchronizion synchronizer) { + void addSynchronization(SynchronizingMessageListener.SubscriptionSynchronization synchronizer) { this.synchronizingMessageListener.addSynchronization(synchronizer); } @@ -1413,7 +1413,7 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff initiallySubscribeToChannels = Collections.emptySet(); // perform channel subscription later as the first call to (p)subscribe blocks the client addSynchronization( - new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, Collections.emptySet(), () -> { + new SynchronizingMessageListener.SubscriptionSynchronization(patterns, Collections.emptySet(), () -> { try { subscribeChannel(channels.toArray(new byte[0][])); } catch (Exception e) { @@ -1424,7 +1424,7 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff initiallySubscribeToChannels = channels; } - addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels, + addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> subscriptionDone.complete(null))); executor.execute(() -> { diff --git a/src/main/java/org/springframework/data/redis/listener/SynchronizingMessageListener.java b/src/main/java/org/springframework/data/redis/listener/SynchronizingMessageListener.java index a03e10d336..878cd447c8 100644 --- a/src/main/java/org/springframework/data/redis/listener/SynchronizingMessageListener.java +++ b/src/main/java/org/springframework/data/redis/listener/SynchronizingMessageListener.java @@ -24,7 +24,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.BiFunction; -import java.util.stream.Collectors; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; @@ -34,7 +33,7 @@ /** * Synchronizing {@link MessageListener} and {@link SubscriptionListener} that allows notifying a {@link Runnable} - * (through {@link SubscriptionSynchronizion}) upon completing subscriptions to channels or patterns. + * (through {@link SubscriptionSynchronization}) upon completing subscriptions to channels or patterns. * * @author Mark Paluch * @since 3.0 @@ -43,7 +42,7 @@ class SynchronizingMessageListener implements MessageListener, SubscriptionListe private final MessageListener messageListener; private final SubscriptionListener subscriptionListener; - private final List synchronizations = new CopyOnWriteArrayList<>(); + private final List synchronizations = new CopyOnWriteArrayList<>(); public SynchronizingMessageListener(MessageListener messageListener, SubscriptionListener subscriptionListener) { this.messageListener = messageListener; @@ -51,11 +50,11 @@ public SynchronizingMessageListener(MessageListener messageListener, Subscriptio } /** - * Register a {@link SubscriptionSynchronizion}. + * Register a {@link SubscriptionSynchronization}. * * @param synchronization must not be {@literal null}. */ - public void addSynchronization(SubscriptionSynchronizion synchronization) { + public void addSynchronization(SubscriptionSynchronization synchronization) { this.synchronizations.add(synchronization); } @@ -68,7 +67,7 @@ public void onMessage(Message message, @Nullable byte[] pattern) { public void onChannelSubscribed(byte[] channel, long count) { subscriptionListener.onChannelSubscribed(channel, count); - handleSubscription(channel, SubscriptionSynchronizion::onChannelSubscribed); + handleSubscription(channel, SubscriptionSynchronization::onChannelSubscribed); } @Override @@ -80,7 +79,7 @@ public void onChannelUnsubscribed(byte[] channel, long count) { public void onPatternSubscribed(byte[] pattern, long count) { subscriptionListener.onPatternSubscribed(pattern, count); - handleSubscription(pattern, SubscriptionSynchronizion::onPatternSubscribed); + handleSubscription(pattern, SubscriptionSynchronization::onPatternSubscribed); } @Override @@ -89,16 +88,16 @@ public void onPatternUnsubscribed(byte[] pattern, long count) { } void handleSubscription(byte[] topic, - BiFunction synchronizerCallback) { + BiFunction synchronizerCallback) { if (synchronizations.isEmpty()) { return; } ByteArrayWrapper binaryChannel = new ByteArrayWrapper(topic); - List finalized = new ArrayList<>(synchronizations.size()); + List finalized = new ArrayList<>(synchronizations.size()); - for (SubscriptionSynchronizion synchronizer : synchronizations) { + for (SubscriptionSynchronization synchronizer : synchronizations) { if (synchronizerCallback.apply(synchronizer, binaryChannel)) { finalized.add(synchronizer); @@ -111,21 +110,22 @@ void handleSubscription(byte[] topic, /** * Synchronization to await subscriptions for channels and patterns. */ - static class SubscriptionSynchronizion { + static class SubscriptionSynchronization { - private static final AtomicIntegerFieldUpdater DONE = AtomicIntegerFieldUpdater - .newUpdater(SubscriptionSynchronizion.class, "done"); + private static final AtomicIntegerFieldUpdater DONE = AtomicIntegerFieldUpdater + .newUpdater(SubscriptionSynchronization.class, "done"); private static final int NOT_DONE = 0; private static final int DONE_DONE = 0; private volatile int done = NOT_DONE; - private final Set remainingPatterns; - private final Set remainingChannels; private final Runnable doneCallback; - public SubscriptionSynchronizion(Collection remainingPatterns, Collection remainingChannels, + private final Set remainingPatterns; + private final Set remainingChannels; + + public SubscriptionSynchronization(Collection remainingPatterns, Collection remainingChannels, Runnable doneCallback) { if (remainingPatterns.isEmpty()) { @@ -133,7 +133,7 @@ public SubscriptionSynchronizion(Collection remainingPatterns, Collectio } else { this.remainingPatterns = ConcurrentHashMap.newKeySet(remainingPatterns.size()); this.remainingPatterns - .addAll(remainingPatterns.stream().map(ByteArrayWrapper::new).collect(Collectors.toList())); + .addAll(remainingPatterns.stream().map(ByteArrayWrapper::new).toList()); } if (remainingChannels.isEmpty()) { @@ -141,7 +141,7 @@ public SubscriptionSynchronizion(Collection remainingPatterns, Collectio } else { this.remainingChannels = ConcurrentHashMap.newKeySet(remainingChannels.size()); this.remainingChannels - .addAll(remainingChannels.stream().map(ByteArrayWrapper::new).collect(Collectors.toList())); + .addAll(remainingChannels.stream().map(ByteArrayWrapper::new).toList()); } this.doneCallback = doneCallback;