diff --git a/pom.xml b/pom.xml index 9a3ac6f487..a50dd4539b 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 2.6.0-SNAPSHOT + 2.6.0-GH-1834-SNAPSHOT Spring Data Redis diff --git a/src/test/java/org/springframework/data/redis/core/AbstractOperationsTestParams.java b/src/test/java/org/springframework/data/redis/core/AbstractOperationsTestParams.java index b9ff4cc686..a8292df24c 100644 --- a/src/test/java/org/springframework/data/redis/core/AbstractOperationsTestParams.java +++ b/src/test/java/org/springframework/data/redis/core/AbstractOperationsTestParams.java @@ -15,8 +15,10 @@ */ package org.springframework.data.redis.core; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; import org.springframework.data.redis.DoubleObjectFactory; import org.springframework.data.redis.LongObjectFactory; @@ -27,6 +29,8 @@ import org.springframework.data.redis.StringObjectFactory; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.GenericToStringSerializer; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; @@ -47,7 +51,11 @@ abstract public class AbstractOperationsTestParams { // DATAREDIS-241 public static Collection testParams() { - return testParams(JedisConnectionFactoryExtension.getConnectionFactory(RedisStanalone.class)); + + List params = new ArrayList<>(); + params.addAll(testParams(LettuceConnectionFactoryExtension.getConnectionFactory(RedisStanalone.class))); + params.addAll(testParams(JedisConnectionFactoryExtension.getConnectionFactory(RedisStanalone.class))); + return params; } // DATAREDIS-241 diff --git a/src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java b/src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java index bceb23421e..76d12a5187 100644 --- a/src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java +++ b/src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java @@ -15,20 +15,17 @@ */ package org.springframework.data.redis.listener; -import static org.assertj.core.api.Assertions.*; +import static org.awaitility.Awaitility.*; import static org.junit.Assume.*; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.List; -import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.junit.jupiter.api.AfterEach; @@ -156,18 +153,7 @@ void testContainerPatternResubscribe() throws Exception { template.convertAndSend(CHANNEL, payload1); template.convertAndSend(ANOTHER_CHANNEL, payload2); - // anotherListener receives both messages - List msgs = new ArrayList<>(); - msgs.add(bag2.poll(500, TimeUnit.MILLISECONDS)); - msgs.add(bag2.poll(500, TimeUnit.MILLISECONDS)); - - assertThat(msgs.size()).isEqualTo(2); - assertThat(msgs).contains(payload1); - assertThat(msgs).contains(payload2); - msgs.clear(); - - // unsubscribed adapter did not receive message - assertThat(bag.poll(500, TimeUnit.MILLISECONDS)).isNull(); + await().atMost(Duration.ofSeconds(2)).until(() -> bag2.contains(payload1) && bag2.contains(payload2)); // bind original listener on another channel container.addMessageListener(adapter, new ChannelTopic(ANOTHER_CHANNEL)); @@ -178,21 +164,10 @@ void testContainerPatternResubscribe() throws Exception { template.convertAndSend(CHANNEL, payload1); template.convertAndSend(ANOTHER_CHANNEL, payload2); - // original listener received only one message on another channel - msgs.clear(); - msgs.add(bag.poll(500, TimeUnit.MILLISECONDS)); - msgs.add(bag.poll(500, TimeUnit.MILLISECONDS)); - - assertThat(msgs).contains(payload2); - assertThat(msgs.contains(null)).isTrue(); + await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload2)); // another listener receives messages on both channels - msgs.clear(); - msgs.add(bag2.poll(500, TimeUnit.MILLISECONDS)); - msgs.add(bag2.poll(500, TimeUnit.MILLISECONDS)); - assertThat(msgs.size()).isEqualTo(2); - assertThat(msgs).contains(payload1); - assertThat(msgs).contains(payload2); + await().atMost(Duration.ofSeconds(2)).until(() -> bag2.contains(payload1) && bag2.contains(payload2)); } @ParameterizedRedisTest @@ -222,15 +197,7 @@ void testContainerChannelResubscribe() throws Exception { template.convertAndSend(ANOTHER_CHANNEL, anotherPayload1); template.convertAndSend(ANOTHER_CHANNEL, anotherPayload2); - Set set = new LinkedHashSet<>(); - set.add(bag.poll(500, TimeUnit.MILLISECONDS)); - set.add(bag.poll(500, TimeUnit.MILLISECONDS)); - - assertThat(set.contains(payload1)).isFalse(); - assertThat(set.contains(payload2)).isFalse(); - - assertThat(set.contains(anotherPayload1)).isTrue(); - assertThat(set.contains(anotherPayload2)).isTrue(); + await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(anotherPayload1) && bag.contains(anotherPayload2)); } /** @@ -246,8 +213,8 @@ void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exceptio container.stop(); - String uniqueChannel = "random-" + UUID.randomUUID().toString(); - PubSubAwaitUtil.runAndAwaitChannelSubscription(template.getConnectionFactory(), uniqueChannel, () -> { + String uniqueChannel = "random-" + UUID.randomUUID(); + PubSubAwaitUtil.runAndAwaitPatternSubscription(template.getConnectionFactory(), () -> { container.addMessageListener(adapter, Arrays.asList(new Topic[] { new ChannelTopic(uniqueChannel), new PatternTopic("s*") })); @@ -256,16 +223,12 @@ void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exceptio // timing: There's currently no other way to synchronize // than to hope the subscribe/unsubscribe are executed within the time. - Thread.sleep(50); + Thread.sleep(250); template.convertAndSend("somechannel", "HELLO"); template.convertAndSend(uniqueChannel, "WORLD"); - Set set = new LinkedHashSet<>(); - set.add(bag.poll(500, TimeUnit.MILLISECONDS)); - set.add(bag.poll(500, TimeUnit.MILLISECONDS)); - - assertThat(set).isEqualTo(new HashSet<>(Arrays.asList(new String[] { "HELLO", "WORLD" }))); + await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains("HELLO") && bag.contains("WORLD")); } private class MessageHandler { diff --git a/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java b/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java index 2856ba9a7d..e0452afd8f 100644 --- a/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java +++ b/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java @@ -21,7 +21,6 @@ import org.springframework.data.redis.ObjectFactory; import org.springframework.data.redis.Person; import org.springframework.data.redis.PersonObjectFactory; -import org.springframework.data.redis.RawObjectFactory; import org.springframework.data.redis.StringObjectFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension; @@ -44,7 +43,6 @@ public static Collection testParams() { // create Jedis Factory ObjectFactory stringFactory = new StringObjectFactory(); ObjectFactory personFactory = new PersonObjectFactory(); - ObjectFactory rawFactory = new RawObjectFactory(); JedisConnectionFactory jedisConnFactory = JedisConnectionFactoryExtension .getNewConnectionFactory(RedisStanalone.class); @@ -78,7 +76,6 @@ public static Collection testParams() { parameters.add(new Object[] { personFactory, personTemplate }); parameters.add(new Object[] { stringFactory, stringTemplateLtc }); parameters.add(new Object[] { personFactory, personTemplateLtc }); - parameters.add(new Object[] { rawFactory, rawTemplateLtc }); if (clusterAvailable()) { diff --git a/src/test/java/org/springframework/data/redis/listener/PubSubTests.java b/src/test/java/org/springframework/data/redis/listener/PubSubTests.java index 94f07a58bc..5e397af918 100644 --- a/src/test/java/org/springframework/data/redis/listener/PubSubTests.java +++ b/src/test/java/org/springframework/data/redis/listener/PubSubTests.java @@ -17,12 +17,12 @@ import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assumptions.*; +import static org.awaitility.Awaitility.*; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.Set; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Phaser; @@ -108,7 +108,7 @@ protected void doExecute(Runnable task) { container.start(); phaser.arriveAndAwaitAdvance(); - Thread.sleep(50); + Thread.sleep(250); } @AfterEach @@ -134,11 +134,7 @@ void testContainerSubscribe() throws Exception { template.convertAndSend(CHANNEL, payload1); template.convertAndSend(CHANNEL, payload2); - Set set = new LinkedHashSet<>(); - set.add((T) bag.poll(1, TimeUnit.SECONDS)); - set.add((T) bag.poll(1, TimeUnit.SECONDS)); - - assertThat(set).contains(payload1, payload2); + await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload1) && bag.contains(payload2)); } @ParameterizedRedisTest @@ -192,10 +188,7 @@ void testStartListenersToNoSpecificChannelTest() throws InterruptedException { template.convertAndSend(CHANNEL, payload); - Set set = new LinkedHashSet<>(); - set.add((T) bag.poll(3, TimeUnit.SECONDS)); - - assertThat(set).contains(payload); + await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload)); } private static boolean isClusterAware(RedisConnectionFactory connectionFactory) {