diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java index ec02cd336e..46e23ff01f 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java @@ -59,6 +59,7 @@ * @author Hugo Wood * @author Artem Bilan * @author Sanghyeok An + * @author Mikhail Polivakha */ public final class KafkaTestUtils { @@ -81,13 +82,30 @@ private KafkaTestUtils() { * @param autoCommit the auto commit. * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance. * @return the properties. + * @deprecated please, use {@link #consumerProps(EmbeddedKafkaBroker, String, boolean)} instead */ + @Deprecated(forRemoval = true, since = "4.0.0") public static Map consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka) { return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit); } + /** + * Set up test properties for an {@code } consumer. + * + * @param group the group id. + * @param autoCommit the auto commit. + * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance. + * @return the properties. + * @since 4.0 + */ + public static Map consumerProps(EmbeddedKafkaBroker embeddedKafka, String group, + boolean autoCommit) { + + return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit); + } + /** * Set up test properties for an {@code } consumer. * @param brokers the bootstrapServers property. @@ -96,7 +114,7 @@ public static Map consumerProps(String group, String autoCommit, * @since 3.3 */ public static Map consumerProps(String brokers, String group) { - return consumerProps(brokers, group, "false"); + return consumerProps(brokers, group, false); } /** @@ -114,8 +132,22 @@ public static Map producerProps(EmbeddedKafkaBroker embeddedKafk * @param group the group id. * @param autoCommit the auto commit. * @return the properties. - */ + * @deprecated Please, use {@link #consumerProps(String, String, boolean)} instead. + */ + @Deprecated(forRemoval = true, since = "4.0.0") public static Map consumerProps(String brokers, String group, String autoCommit) { + return consumerProps(brokers, group, Boolean.parseBoolean(autoCommit)); + } + + /** + * Set up test properties for an {@code } consumer. + * @param brokers the bootstrapServers property. + * @param group the group id. + * @param autoCommit the auto commit. + * @return the properties. + * @since 4.0 + */ + public static Map consumerProps(String brokers, String group, boolean autoCommit) { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); props.put(ConsumerConfig.GROUP_ID_CONFIG, group); @@ -235,7 +267,7 @@ public static ConsumerRecord getSingleRecord(Consumer consume public static ConsumerRecord getOneRecord(String brokerAddresses, String group, String topic, int partition, boolean seekToLast, boolean commit, Duration timeout) { - Map consumerConfig = consumerProps(brokerAddresses, group, "false"); + Map consumerConfig = consumerProps(brokerAddresses, group, false); consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); try (KafkaConsumer consumer = new KafkaConsumer(consumerConfig)) { TopicPartition topicPart = new TopicPartition(topic, partition); diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/AddressableEmbeddedBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/AddressableEmbeddedBrokerTests.java index 8c58e651df..46c412116c 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/AddressableEmbeddedBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/AddressableEmbeddedBrokerTests.java @@ -69,7 +69,7 @@ public void testKafkaEmbedded() { @Test public void testLateStartedConsumer() { - Map consumerProps = KafkaTestUtils.consumerProps(TEST_EMBEDDED, "false", this.broker); + Map consumerProps = KafkaTestUtils.consumerProps(this.broker, TEST_EMBEDDED, false); Consumer consumer = new KafkaConsumer<>(consumerProps); this.broker.consumeFromAnEmbeddedTopic(consumer, TEST_EMBEDDED); @@ -78,7 +78,7 @@ public void testLateStartedConsumer() { producer.close(); KafkaTestUtils.getSingleRecord(consumer, TEST_EMBEDDED); - consumerProps = KafkaTestUtils.consumerProps("another" + TEST_EMBEDDED, "false", this.broker); + consumerProps = KafkaTestUtils.consumerProps(this.broker, "another" + TEST_EMBEDDED, false); Consumer consumer2 = new KafkaConsumer<>(consumerProps); this.broker.consumeFromAnEmbeddedTopic(consumer2, TEST_EMBEDDED); KafkaTestUtils.getSingleRecord(consumer2, TEST_EMBEDDED); diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java index cc1b5e9412..b17a9c87cb 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java @@ -51,7 +51,7 @@ void testConsumeFromEmbeddedWithSeekToEnd() { Map producerProps = KafkaTestUtils.producerProps(kafka); KafkaProducer producer = new KafkaProducer<>(producerProps); producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd")); - Map consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka); + Map consumerProps = KafkaTestUtils.consumerProps(kafka, "seekTest", false); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic"); producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd")); diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java index e5be343800..7037394b72 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java @@ -55,7 +55,7 @@ void testGetSingleWithMoreThanOneTopic(EmbeddedKafkaBroker broker) { producer.send(new ProducerRecord<>("singleTopic1", 0, 1, "foo")); producer.send(new ProducerRecord<>("singleTopic2", 0, 1, "foo")); producer.close(); - Map consumerProps = KafkaTestUtils.consumerProps("ktuTests1", "false", broker); + Map consumerProps = KafkaTestUtils.consumerProps(broker, "ktuTests1", false); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); broker.consumeFromAllEmbeddedTopics(consumer); KafkaTestUtils.getSingleRecord(consumer, "singleTopic1"); @@ -72,7 +72,7 @@ void testGetSingleWithMoreThanOneTopicRecordNotThereYet(EmbeddedKafkaBroker brok Map producerProps = KafkaTestUtils.producerProps(broker); KafkaProducer producer = new KafkaProducer<>(producerProps); producer.send(new ProducerRecord<>("singleTopic4", 0, 1, "foo")); - Map consumerProps = KafkaTestUtils.consumerProps("ktuTests2", "false", broker); + Map consumerProps = KafkaTestUtils.consumerProps(broker, "ktuTests2", false); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); broker.consumeFromEmbeddedTopics(consumer, "singleTopic4", "singleTopic5"); long t1 = System.currentTimeMillis(); @@ -117,7 +117,7 @@ public void testMultiMinRecords(EmbeddedKafkaBroker broker) { Map producerProps = KafkaTestUtils.producerProps(broker); KafkaProducer producer = new KafkaProducer<>(producerProps); producer.send(new ProducerRecord<>("multiTopic1", 0, 1, "foo")); - Map consumerProps = KafkaTestUtils.consumerProps("ktuTests3", "false", broker); + Map consumerProps = KafkaTestUtils.consumerProps(broker, "ktuTests3", false); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); broker.consumeFromAnEmbeddedTopic(consumer, "multiTopic1"); new Thread(() -> { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java index dd043851cd..95a37dd4da 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java @@ -157,9 +157,7 @@ public DefaultKafkaConsumerFactory consumerFactory() { @Bean public Map consumerConfigs() { - Map consumerProps = - KafkaTestUtils.consumerProps("myAliasGroup", "false", embeddedKafka()); - return consumerProps; + return KafkaTestUtils.consumerProps(embeddedKafka(), "myAliasGroup", false); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java index 1b90d4e2cc..5a00ade505 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java @@ -343,7 +343,7 @@ public DefaultKafkaConsumerFactory consumerFactory( @Bean public Map consumerConfigs(EmbeddedKafkaBroker embeddedKafka) { - return KafkaTestUtils.consumerProps("test", "false", embeddedKafka); + return KafkaTestUtils.consumerProps(embeddedKafka, "test", false); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversion2Tests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversion2Tests.java index af5477e44c..86a4979ee5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversion2Tests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversion2Tests.java @@ -105,7 +105,7 @@ public DefaultKafkaConsumerFactory consumerFactory() { @Bean public Map consumerConfigs() { Map consumerProps = - KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", this.embeddedKafka); + KafkaTestUtils.consumerProps(this.embeddedKafka, DEFAULT_TEST_GROUP_ID, false); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class); consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java index f631c8d789..14b6c7d796 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java @@ -199,7 +199,7 @@ public DefaultKafkaConsumerFactory consumerFactory(EmbeddedKafkaBr @Bean public Map consumerConfigs(EmbeddedKafkaBroker embeddedKafka) { Map consumerProps = - KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka); + KafkaTestUtils.consumerProps(embeddedKafka, DEFAULT_TEST_GROUP_ID, false); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1000); consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 552cc500aa..1e96699e66 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -1555,7 +1555,7 @@ private ConsumerFactory configuredConsumerFactory(String client @Bean public Map consumerConfigs() { - return KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", this.embeddedKafka); + return KafkaTestUtils.consumerProps(this.embeddedKafka, DEFAULT_TEST_GROUP_ID, false); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index cb6aede96d..0f4ae18ba7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -367,7 +367,7 @@ public void testNestedTxProducerIsCached() throws Exception { DefaultKafkaProducerFactory pfTx = new DefaultKafkaProducerFactory<>(producerProps); pfTx.setTransactionIdPrefix("fooTx."); KafkaOperations templateTx = new KafkaTemplate<>(pfTx); - Map consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(this.embeddedKafka, "txCache1Group", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); AtomicReference> wrapped = new AtomicReference<>(); cf.addPostProcessor(consumer -> { @@ -418,7 +418,7 @@ public void testNestedTxProducerIsFixed() throws Exception { TransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy(3); pfTx.setTransactionIdSuffixStrategy(suffixStrategy); KafkaOperations templateTx = new KafkaTemplate<>(pfTx); - Map consumerProps = KafkaTestUtils.consumerProps("txCache1FixedGroup", "false", this.embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(this.embeddedKafka, "txCache1FixedGroup", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); AtomicReference> wrapped = new AtomicReference<>(); cf.addPostProcessor(consumer -> { @@ -464,7 +464,7 @@ public void testNestedTxProducerIsFixed() throws Exception { @ParameterizedTest @ValueSource(booleans = { true, false }) void listener(boolean closeWithTimeout) { - Map consumerConfig = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka); + Map consumerConfig = KafkaTestUtils.consumerProps(this.embeddedKafka, "txCache1Group", false); consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "foo-0"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(consumerConfig); List adds = new ArrayList<>(); @@ -503,7 +503,7 @@ public void consumerRemoved(String id, Consumer consumer) { void configDeserializer() { Deserializer key = mock(Deserializer.class); Deserializer value = mock(Deserializer.class); - Map config = KafkaTestUtils.consumerProps("mockGroup", "false", this.embeddedKafka); + Map config = KafkaTestUtils.consumerProps(this.embeddedKafka, "mockGroup", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(config, key, value); Deserializer keyDeserializer = cf.getKeyDeserializer(); assertThat(keyDeserializer).isSameAs(key); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index 2853eeef1d..8f27655700 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -136,7 +136,7 @@ public void producerRemoved(String id, Producer producer) { public static void setUp() { embeddedKafka = EmbeddedKafkaCondition.getBroker(); Map consumerProps = KafkaTestUtils - .consumerProps("KafkaTemplatetests" + UUID.randomUUID(), "false", embeddedKafka); + .consumerProps(embeddedKafka, "KafkaTemplatetests" + UUID.randomUUID(), false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); consumer = cf.createConsumer(); embeddedKafka.consumeFromAnEmbeddedTopic(consumer, INT_KEY_TOPIC); @@ -165,7 +165,7 @@ void testTemplate() { template.setDefaultTopic(INT_KEY_TOPIC); template.setConsumerFactory( - new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka))); + new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps(embeddedKafka, "xx", false))); ConsumerRecords initialRecords = template.receive(Collections.singleton(new TopicPartitionOffset(INT_KEY_TOPIC, 1, 1L))); assertThat(initialRecords).isEmpty(); @@ -475,7 +475,7 @@ void testTemplateDisambiguation() { pf.setKeySerializer(new StringSerializer()); KafkaTemplate template = new KafkaTemplate<>(pf, true); template.setDefaultTopic(STRING_KEY_TOPIC); - Map consumerProps = KafkaTestUtils.consumerProps("testTString", "false", embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testTString", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); cf.setKeyDeserializer(new StringDeserializer()); Consumer localConsumer = cf.createConsumer(); @@ -630,7 +630,7 @@ void testReceiveWhenOffsetIsInvalid(Long offset) { KafkaTemplate template = new KafkaTemplate<>(pf, true); template.setConsumerFactory( - new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka))); + new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps(embeddedKafka, "xx", false))); TopicPartitionOffset tpoWithNullOffset = new TopicPartitionOffset(INT_KEY_TOPIC, 1, offset); assertThatExceptionOfType(KafkaException.class) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index 54a70f43d2..53003e3f96 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -115,7 +115,7 @@ public void testLocalTransaction() { pf.setKeySerializer(new StringSerializer()); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); - Map consumerProps = KafkaTestUtils.consumerProps("testLocalTx", "false", embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testLocalTx", false); consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); cf.setKeyDeserializer(new StringDeserializer()); @@ -173,7 +173,7 @@ public void testLocalTransactionIsFixed() { pf.setTransactionIdSuffixStrategy(suffixStrategy); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); - Map consumerProps = KafkaTestUtils.consumerProps("testLocalTxFixed", "false", embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testLocalTxFixed", false); consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); cf.setKeyDeserializer(new StringDeserializer()); @@ -232,7 +232,7 @@ public void testGlobalTransaction() { pf.setTransactionIdPrefix("my.transaction."); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); - Map consumerProps = KafkaTestUtils.consumerProps("testGlobalTx", "false", embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testGlobalTx", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); cf.setKeyDeserializer(new StringDeserializer()); Consumer consumer = cf.createConsumer(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java index 0a45273957..2fdd976c76 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java @@ -99,7 +99,7 @@ public class ReactiveKafkaProducerTemplateIntegrationTests { @BeforeAll public static void setUpBeforeClass() { Map consumerProps = KafkaTestUtils - .consumerProps("reactive_consumer_group", "false", EmbeddedKafkaCondition.getBroker()); + .consumerProps(EmbeddedKafkaCondition.getBroker(), "reactive_consumer_group", false); reactiveKafkaConsumerTemplate = new ReactiveKafkaConsumerTemplate<>(setupReceiverOptionsWithDefaultTopic(consumerProps)); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java index dda1889f2b..410854695b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java @@ -100,7 +100,7 @@ public class ReactiveKafkaProducerTemplateTransactionIntegrationTests { @BeforeAll public static void setUpBeforeClass() { Map consumerProps = - KafkaTestUtils.consumerProps(CONSUMER_GROUP_ID, "false", EmbeddedKafkaCondition.getBroker()); + KafkaTestUtils.consumerProps(EmbeddedKafkaCondition.getBroker(), CONSUMER_GROUP_ID, false); reactiveKafkaConsumerTemplate = new ReactiveKafkaConsumerTemplate<>(setupReceiverOptionsWithDefaultTopic(consumerProps)); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java index bec23890fa..efefbd5151 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java @@ -150,7 +150,7 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFa @Bean ConsumerFactory consumerFactory() { - return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("test-group", "false", this.broker)); + return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps(this.broker, "test-group", false)); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AsyncAckAfterHandleTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AsyncAckAfterHandleTests.java index d301b72b6b..8ec4329d6f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AsyncAckAfterHandleTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AsyncAckAfterHandleTests.java @@ -110,7 +110,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCon @Bean ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { - Map props = KafkaTestUtils.consumerProps("asaac.grp", "false", broker); + Map props = KafkaTestUtils.consumerProps(broker, "asaac.grp", false); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3); return new DefaultKafkaConsumerFactory<>( props); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index 274d56a2c7..e1fe24478f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -130,7 +130,7 @@ public static void setup() { @Test public void testAutoCommit() throws Exception { this.logger.info("Start auto"); - Map props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test1", true); AtomicReference overrides = new AtomicReference<>(); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props) { @@ -253,7 +253,7 @@ else if (e instanceof ConcurrentContainerStoppedEvent concurrentContainerStopped @Test public void testAutoCommitWithRebalanceListener() throws Exception { this.logger.info("Start auto"); - Map props = KafkaTestUtils.consumerProps("test10", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test10", false); AtomicReference overrides = new AtomicReference<>(); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props) { @@ -330,7 +330,7 @@ public void onPartitionsAssigned(Collection partitions) { @Test public void testAfterListenCommit() throws Exception { this.logger.info("Start manual"); - Map props = KafkaTestUtils.consumerProps("test2", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test2", false); props.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); AtomicReference overrides = new AtomicReference<>(); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props) { @@ -406,7 +406,7 @@ public void testManualCommit() throws Exception { private void testManualCommitGuts(ContainerProperties.AckMode ackMode, String topic, int qual) throws Exception { this.logger.info("Start " + ackMode); - Map props = KafkaTestUtils.consumerProps("test" + ackMode + qual, "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test" + ackMode + qual, false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic); final CountDownLatch latch = new CountDownLatch(4); @@ -451,7 +451,7 @@ public void testManualCommitExisting() throws Exception { template.sendDefault(0, "baz"); template.sendDefault(2, "qux"); template.flush(); - Map props = KafkaTestUtils.consumerProps("testManualExisting", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testManualExisting", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic7); final CountDownLatch latch = new CountDownLatch(8); @@ -502,7 +502,7 @@ public void testManualCommitSyncExisting() throws Exception { template.sendDefault(0, "baz"); template.sendDefault(2, "qux"); template.flush(); - Map props = KafkaTestUtils.consumerProps("testManualExistingSync", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testManualExistingSync", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic8); containerProps.setSyncCommits(true); @@ -540,7 +540,7 @@ public void testManualCommitSyncExisting() throws Exception { @Test public void testPausedStart() throws Exception { this.logger.info("Start paused start"); - Map props = KafkaTestUtils.consumerProps("test12", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test12", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic12); @@ -630,7 +630,7 @@ public ConsumerRecords answer(InvocationOnMock invocation) thro @Test public void testListenerException() throws Exception { this.logger.info("Start exception"); - Map props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test1", true); props.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic6); @@ -682,7 +682,7 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, @Test public void testAckOnErrorRecord() throws Exception { logger.info("Start ack on error"); - Map props = KafkaTestUtils.consumerProps("test9", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test9", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); final CountDownLatch latch = new CountDownLatch(4); ContainerProperties containerProps = new ContainerProperties(topic9); @@ -766,7 +766,7 @@ public void testAckOnErrorManualImmediate() throws Exception { private void testAckOnErrorWithManualImmediateGuts(String topic, boolean ackOnError) throws Exception { logger.info("Start ack on error with ManualImmediate ack mode"); - Map props = KafkaTestUtils.consumerProps("testMan" + ackOnError, "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testMan" + ackOnError, false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); final CountDownLatch latch = new CountDownLatch(2); ContainerProperties containerProps = new ContainerProperties(topic); @@ -826,8 +826,7 @@ private void testAckOnErrorWithManualImmediateGuts(String topic, boolean ackOnEr @Test public void testIsChildRunning() throws Exception { this.logger.info("Start isChildRunning"); - Map props = KafkaTestUtils.consumerProps("test1", "true", - embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test1", true); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props) { @Override @@ -974,8 +973,7 @@ protected Consumer createKafkaConsumer(String groupId, String c @Test public void testContainerStartStop() throws Exception { this.logger.info("Start containerStartStop"); - Map props = KafkaTestUtils.consumerProps("test1", "true", - embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test1", true); AtomicReference overrides = new AtomicReference<>(); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerEnforceRebalanceTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerEnforceRebalanceTests.java index 1dd127a24f..9db7156f04 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerEnforceRebalanceTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerEnforceRebalanceTests.java @@ -135,7 +135,7 @@ public void onPartitionsRevoked(Collection partitions) { @Bean ConsumerFactory cf() { return new DefaultKafkaConsumerFactory<>( - KafkaTestUtils.consumerProps("enforce-rebalance-topic", "false", this.broker)); + KafkaTestUtils.consumerProps(this.broker, "enforce-rebalance-topic", false)); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerGroupSequencerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerGroupSequencerTests.java index c476913167..7a1e970980 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerGroupSequencerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerGroupSequencerTests.java @@ -154,7 +154,7 @@ KafkaTemplate template(ProducerFactory pf) { @Bean ConsumerFactory cf(EmbeddedKafkaBroker broker) { - return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("", "false", broker)); + return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps(broker, "", false)); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchIntegrationTests.java index 3d8396ae6e..9788e29ed1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchIntegrationTests.java @@ -76,7 +76,7 @@ public static void setup() { @Test public void recoveryAndDlt() throws Exception { - Map props = KafkaTestUtils.consumerProps("recoverBatch", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "recoverBatch", false); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1000); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -147,7 +147,7 @@ public void recoveryAndDlt() throws Exception { @Test public void recoveryFails() throws Exception { - Map props = KafkaTestUtils.consumerProps("recoverBatch2", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "recoverBatch2", false); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1000); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java index 58c91cb749..a7f8fb4489 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java @@ -246,7 +246,7 @@ ConsumerFactory consumerFactory() { Map props = KafkaTestUtils.consumerProps( this.broker.getBrokersAsString(), "DeliveryAttemptAwareRetryListenerIntegrationTestsGroupId", - "true"); + true); return new DefaultKafkaConsumerFactory<>(props); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java index b0198e2a84..f1b6f734e7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java @@ -245,7 +245,7 @@ else if (r.key() == null && t.getCause() instanceof DeserializationException) { @Bean public ConsumerFactory cf() { - Map props = KafkaTestUtils.consumerProps(TOPIC + ".g1", "false", embeddedKafka()); + Map props = KafkaTestUtils.consumerProps(embeddedKafka(), TOPIC + ".g1", false); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ExtendedEHD.class.getName()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, FailSometimesDeserializer.class); @@ -255,7 +255,7 @@ public ConsumerFactory cf() { @Bean public ConsumerFactory cfWithExplicitDeserializers() { - Map props = KafkaTestUtils.consumerProps(TOPIC + ".g2", "false", embeddedKafka()); + Map props = KafkaTestUtils.consumerProps(embeddedKafka(), TOPIC + ".g2", false); return new DefaultKafkaConsumerFactory<>(props, new ErrorHandlingDeserializer(new FailSometimesDeserializer()).keyDeserializer(true), new ErrorHandlingDeserializer(new FailSometimesDeserializer())); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerIntegrationTests.java index eb9899945b..2d0a6aee92 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerIntegrationTests.java @@ -78,7 +78,7 @@ public static void setup() { @Test public void testRetriesAndDlt() throws InterruptedException { - Map props = KafkaTestUtils.consumerProps("retryBatch", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "retryBatch", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic1); containerProps.setPollTimeout(10_000); @@ -163,7 +163,7 @@ public void publishEvent(Object event) { @Test public void testRetriesCantRecover() throws InterruptedException { - Map props = KafkaTestUtils.consumerProps("retryBatch2", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "retryBatch2", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic2); containerProps.setPollTimeout(10_000); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 5b4919b2ee..603c223e3e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -222,7 +222,7 @@ public static void setup() { @Test public void testDelegateType() throws Exception { - Map props = KafkaTestUtils.consumerProps("delegate", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "delegate", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic3); containerProps.setShutdownTimeout(60_000L); @@ -308,7 +308,7 @@ public void testDelegateType() throws Exception { @Test public void testNoResetPolicy() throws Exception { - Map props = KafkaTestUtils.consumerProps("delegate", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "delegate", false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic17); @@ -334,7 +334,7 @@ public void testNoResetPolicy() throws Exception { @Test public void testListenerTypes() throws Exception { - Map props = KafkaTestUtils.consumerProps("lt1", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "lt1", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic4); @@ -475,7 +475,7 @@ public void testListenerTypes() throws Exception { @SuppressWarnings("unchecked") @Test public void testCommitsAreFlushedOnStop() throws Exception { - Map props = KafkaTestUtils.consumerProps("flushedOnStop", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "flushedOnStop", false); DefaultKafkaConsumerFactory cf = spy(new DefaultKafkaConsumerFactory<>(props)); AtomicReference> consumer = new AtomicReference<>(); willAnswer(inv -> { @@ -526,7 +526,7 @@ public void testCommitsAreFlushedOnStop() throws Exception { @Test public void testRecordAck() throws Exception { logger.info("Start record ack"); - Map props = KafkaTestUtils.consumerProps("test6", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test6", false); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic6); @@ -1149,7 +1149,7 @@ public void testNonResponsiveConsumerEventNotIssuedWithActiveConsumer() throws E public void testBatchAck() throws Exception { logger.info("Start batch ack"); - Map props = KafkaTestUtils.consumerProps("test6", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test6", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic7); containerProps.setMessageListener((MessageListener) message -> { @@ -1216,7 +1216,7 @@ public void testBatchAck() throws Exception { public void testBatchListener() throws Exception { logger.info("Start batch listener"); - Map props = KafkaTestUtils.consumerProps("test8", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test8", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic8); containerProps.setMessageListener((BatchMessageListener) messages -> { @@ -1293,7 +1293,7 @@ public void testBatchListenerManual() throws Exception { template.sendDefault(1, 0, "qux"); template.flush(); - Map props = KafkaTestUtils.consumerProps("test9", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test9", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic9); final CountDownLatch latch = new CountDownLatch(4); @@ -1353,7 +1353,7 @@ else if (entry.getValue().offset() == 2) { public void testBatchListenerErrors() throws Exception { logger.info("Start batch listener errors"); - Map props = KafkaTestUtils.consumerProps("test9", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test9", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic10); containerProps.setMessageListener((BatchMessageListener) messages -> { @@ -1484,7 +1484,7 @@ public void onMessage(List> data) { @Test public void testSeekBatch() throws Exception { logger.info("Start seek batch seek"); - Map props = KafkaTestUtils.consumerProps("test16", "true", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test16", true); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic16); final CountDownLatch registerLatch = new CountDownLatch(1); @@ -1538,11 +1538,11 @@ public void onIdleContainer(Map assignments, ConsumerSeekC } private static Stream testSeekParameters() { - Map noAutoCommit = KafkaTestUtils.consumerProps("test15", "true", embeddedKafka); + Map noAutoCommit = KafkaTestUtils.consumerProps(embeddedKafka, "test15", true); noAutoCommit.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); // test false by default return Stream.of( - Arguments.of(KafkaTestUtils.consumerProps("test11", "false", embeddedKafka), topic11, false), - Arguments.of(KafkaTestUtils.consumerProps("test12", "true", embeddedKafka), topic12, true), + Arguments.of(KafkaTestUtils.consumerProps(embeddedKafka, "test11", false), topic11, false), + Arguments.of(KafkaTestUtils.consumerProps(embeddedKafka, "test12", true), topic12, true), Arguments.of(noAutoCommit, topic15, false)); } @@ -1675,7 +1675,7 @@ public void publishEvent(ApplicationEvent event) { @Test public void testDefinedPartitions() throws Exception { this.logger.info("Start defined parts"); - Map props = KafkaTestUtils.consumerProps("test13", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test13", false); TopicPartitionOffset topic1Partition0 = new TopicPartitionOffset(topic13, 0, 0L); CountDownLatch initialConsumersLatch = new CountDownLatch(2); @@ -1986,7 +1986,7 @@ private void stubSetRunning(final CountDownLatch listenerConsumerAvailableLatch, @Test public void testManualAckRebalance() throws Exception { logger.info("Start manual ack rebalance"); - Map props = KafkaTestUtils.consumerProps("test14", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test14", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic14); final List counts = new ArrayList<>(); @@ -2078,7 +2078,7 @@ public void onPartitionsAssigned(Collection partitions) { @Test public void testJsonSerDeConfiguredType() throws Exception { this.logger.info("Start JSON1"); - Map props = KafkaTestUtils.consumerProps("testJson", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testJson", false); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Foo.class); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -2118,7 +2118,7 @@ public void testJsonSerDeConfiguredType() throws Exception { public void testJsonSerDeWithInstanceDoesNotUseConfiguration() throws Exception { this.logger.info("Start JSON1a"); Class consumerConfigValueDefaultType = Foo1.class; - Map props = KafkaTestUtils.consumerProps("testJson", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testJson", false); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, consumerConfigValueDefaultType); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props, null, new JsonDeserializer<>(Foo.class)); @@ -2158,7 +2158,7 @@ public void testJsonSerDeWithInstanceDoesNotUseConfiguration() throws Exception @Test public void testJsonSerDeHeaderSimpleType() throws Exception { this.logger.info("Start JSON2"); - Map props = KafkaTestUtils.consumerProps("testJson", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testJson", false); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); @@ -2200,7 +2200,7 @@ public void testJsonSerDeHeaderSimpleType() throws Exception { @Test public void testJsonSerDeTypeMappings() throws Exception { this.logger.info("Start JSON3"); - Map props = KafkaTestUtils.consumerProps("testJson", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testJson", false); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); props.put(JsonDeserializer.TYPE_MAPPINGS, "foo:" + Foo1.class.getName() + " , bar:" + Bar1.class.getName()); @@ -2241,7 +2241,7 @@ public void testJsonSerDeTypeMappings() throws Exception { @Test public void testJsonSerDeIgnoreTypeHeadersInbound() throws Exception { this.logger.info("Start JSON4"); - Map props = KafkaTestUtils.consumerProps("testJson", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testJson", false); props.put("spring.deserializer.value.delegate.class", "org.apache.kafka.common.serialization.StringDeserializer"); ErrorHandlingDeserializer errorHandlingDeserializer = @@ -2283,7 +2283,7 @@ public void testJsonSerDeIgnoreTypeHeadersInbound() throws Exception { @Test public void testStaticAssign() throws Exception { this.logger.info("Start static"); - Map props = KafkaTestUtils.consumerProps("testStatic", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testStatic", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topic22, 0), @@ -2326,7 +2326,7 @@ public void testStaticAssign() throws Exception { @Test public void testPatternAssign() throws Exception { this.logger.info("Start pattern"); - Map props = KafkaTestUtils.consumerProps("testpattern", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testpattern", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(Pattern.compile(topic23 + ".*")); @@ -2359,7 +2359,7 @@ public void testPatternAssign() throws Exception { @Test public void testBadListenerType() { - Map props = KafkaTestUtils.consumerProps("testStatic", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testStatic", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties("foo"); containerProps.setMissingTopicsFatal(false); @@ -2381,7 +2381,7 @@ public void testBadListenerType() { @Test public void testBadAckMode() { - Map props = KafkaTestUtils.consumerProps("testStatic", "true", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testStatic", true); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties("foo"); containerProps.setMissingTopicsFatal(false); @@ -2398,7 +2398,7 @@ public void testBadAckMode() { @Test public void testRebalanceAfterFailedRecord() throws Exception { logger.info("Start rebalance after failed record"); - Map props = KafkaTestUtils.consumerProps("test18", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test18", false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic18); final List counts = new ArrayList<>(); @@ -3003,7 +3003,7 @@ public void testExceptionWhenCommitAfterRebalance() throws Exception { final CountDownLatch consumeFirstLatch = new CountDownLatch(1); final CountDownLatch consumeLatch = new CountDownLatch(2); - Map props = KafkaTestUtils.consumerProps("test19", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test19", false); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3_000); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic19); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/MissingTopicCheckOverrideAdminConfigTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/MissingTopicCheckOverrideAdminConfigTests.java index 6ec62b5bf3..675f494fb0 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/MissingTopicCheckOverrideAdminConfigTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/MissingTopicCheckOverrideAdminConfigTests.java @@ -47,7 +47,7 @@ public class MissingTopicCheckOverrideAdminConfigTests { @Test void configOverride(EmbeddedKafkaBroker broker) { - Map consumerProps = KafkaTestUtils.consumerProps("grp", "false", broker); + Map consumerProps = KafkaTestUtils.consumerProps(broker, "grp", false); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "junkjunk"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); ContainerProperties props = new ContainerProperties("mtccac"); @@ -70,7 +70,7 @@ public void checkTopics() { @Test void configOverrideDefault(EmbeddedKafkaBroker broker) { - Map consumerProps = KafkaTestUtils.consumerProps("grp", "false", broker); + Map consumerProps = KafkaTestUtils.consumerProps(broker, "grp", false); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "junkjunk"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); ContainerProperties props = new ContainerProperties("mtccac"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/MissingTopicsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/MissingTopicsTests.java index a8832cc178..6a47add4de 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/MissingTopicsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/MissingTopicsTests.java @@ -47,7 +47,7 @@ public static void setup() { @Test public void testMissingTopicCMLC() { - Map props = KafkaTestUtils.consumerProps("missing1", "true", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "missing1", true); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties("notexisting"); containerProps.setMessageListener((MessageListener) message -> { }); @@ -67,7 +67,7 @@ public void testMissingTopicCMLC() { @Test public void testMissingTopicKMLC() { - Map props = KafkaTestUtils.consumerProps("missing2", "true", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "missing2", true); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties("notexisting"); containerProps.setMessageListener((MessageListener) message -> { }); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/PauseContainerWhileErrorHandlerIsRetryingTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/PauseContainerWhileErrorHandlerIsRetryingTests.java index b2e45c2f53..f76db58605 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/PauseContainerWhileErrorHandlerIsRetryingTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/PauseContainerWhileErrorHandlerIsRetryingTests.java @@ -213,7 +213,7 @@ private ConsumerFactory makePausingAfterPollConsumerFactory(ConsumerFactory dele @Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory( - KafkaTestUtils.consumerProps("grp", "false", embeddedKafkaBroker) + KafkaTestUtils.consumerProps(embeddedKafkaBroker, "grp", false) ); ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setBatchListener(true); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java index 1bd5d093bd..dc76ba5d80 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java @@ -89,7 +89,7 @@ public static void setup() { @Test public void testMaxFailures() throws Exception { - Map props = KafkaTestUtils.consumerProps("seekTestMaxFailures", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "seekTestMaxFailures", false); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props, null, new ErrorHandlingDeserializer<>(new JsonDeserializer<>(String.class))); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTests.java index 79be54d3b5..53b30885f3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTests.java @@ -134,7 +134,7 @@ void withFilter() throws Exception { @Test void defaults() { - Map props = KafkaTestUtils.consumerProps("sbpp", "false", this.broker); + Map props = KafkaTestUtils.consumerProps(this.broker, "sbpp", false); ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties("sbpp"); containerProps.setMessageListener(mock(MessageListener.class)); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 342dc97013..5ac4c4bdbd 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -494,7 +494,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception { @SuppressWarnings({ "unchecked"}) @Test public void testRollbackRecord() throws Exception { - Map props = KafkaTestUtils.consumerProps("txTest1", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "txTest1", false); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -599,7 +599,7 @@ public void testFixLagKTM() throws InterruptedException { @SuppressWarnings({"unchecked"}) private void testFixLagGuts(String topic, int whichTm) throws InterruptedException { - Map props = KafkaTestUtils.consumerProps("txTest2", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "txTest2", false); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic); @@ -655,7 +655,7 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti @Test public void testMaxFailures() throws Exception { String group = "groupInARBP"; - Map props = KafkaTestUtils.consumerProps(group, "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, group, false); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic3); @@ -763,7 +763,7 @@ public void accept(ConsumerRecord record, Consumer consumer, Excepti @Test public void testBatchListenerMaxFailuresOnRecover() throws Exception { String group = "groupInARBP2"; - Map props = KafkaTestUtils.consumerProps(group, "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, group, false); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic8); @@ -880,7 +880,7 @@ public void accept(ConsumerRecord record, Consumer consumer, Excepti @SuppressWarnings("unchecked") @Test public void testRollbackProcessorCrash() throws Exception { - Map props = KafkaTestUtils.consumerProps("testRollbackNoRetries", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "testRollbackNoRetries", false); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic4); @@ -941,7 +941,7 @@ public void testRollbackProcessorCrash() throws Exception { @Test public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Exception { String group = "testBatchListenerRollbackNoRetries"; - Map props = KafkaTestUtils.consumerProps(group, "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, group, false); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -1095,7 +1095,7 @@ void testArbpWithoutRecovery() throws InterruptedException { final KafkaTemplate template = new KafkaTemplate<>(pf); // init consumer String group = "groupInARBP3"; - Map consumerProperties = KafkaTestUtils.consumerProps(group, "false", embeddedKafka); + Map consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka, group, false); consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProperties); ContainerProperties containerProps = new ContainerProperties(topic10); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index c587ac53a2..2018cbca47 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -707,7 +707,7 @@ public void onPartitionsAssigned(Collection partitions) { } }); - Map consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, this.testName, false); if (badDeser) { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, BadDeser.class); @@ -731,7 +731,7 @@ public ReplyingKafkaTemplate createTemplate(TopicPartit throws InterruptedException { ContainerProperties containerProperties = new ContainerProperties(topic); - Map consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, this.testName, false); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProperties); @@ -751,7 +751,7 @@ public AggregatingReplyingKafkaTemplate aggregatingTemp ContainerProperties containerProperties = new ContainerProperties(topic); containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE); - Map consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, this.testName, false); DefaultKafkaConsumerFactory>> cf = new DefaultKafkaConsumerFactory<>(consumerProps); KafkaMessageListenerContainer>> container = @@ -887,7 +887,7 @@ public DefaultKafkaProducerFactory pf() { @Bean public DefaultKafkaConsumerFactory cf() { - Map consumerProps = KafkaTestUtils.consumerProps("serverSide", "false", this.embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(this.embeddedKafka, "serverSide", false); return new DefaultKafkaConsumerFactory<>(consumerProps); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java index 974434b567..0a059ce6fc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java @@ -895,7 +895,7 @@ ConsumerFactory consumerFactory() { Map props = KafkaTestUtils.consumerProps( this.broker.getBrokersAsString(), "groupId", - "false"); + false); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java index 44c9862242..6fcbf38789 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java @@ -1334,7 +1334,7 @@ ConsumerFactory consumerFactory() { Map props = KafkaTestUtils.consumerProps( this.broker.getBrokersAsString(), "groupId", - "false"); + false); return new DefaultKafkaConsumerFactory<>(props); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoFutureRetryTopicClassLevelIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoFutureRetryTopicClassLevelIntegrationTests.java index 64ef40cded..b58fabc917 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoFutureRetryTopicClassLevelIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoFutureRetryTopicClassLevelIntegrationTests.java @@ -894,7 +894,7 @@ ConsumerFactory consumerFactory() { Map props = KafkaTestUtils.consumerProps( this.broker.getBrokersAsString(), "groupId", - "false"); + false); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java index fff1e2c64a..238f28e202 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java @@ -1329,7 +1329,7 @@ ConsumerFactory consumerFactory() { Map props = KafkaTestUtils.consumerProps( this.broker.getBrokersAsString(), "groupId", - "false"); + false); return new DefaultKafkaConsumerFactory<>(props); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeliveryHeaderTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeliveryHeaderTests.java index 0c4bac4498..c93fea1d34 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeliveryHeaderTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeliveryHeaderTests.java @@ -157,7 +157,7 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerF @Primary ConsumerFactory cf() { return new DefaultKafkaConsumerFactory<>( - KafkaTestUtils.consumerProps("dh1", "false", this.broker)); + KafkaTestUtils.consumerProps(this.broker, "dh1", false)); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DltStartupTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DltStartupTests.java index e1e91a85ea..4cc2162c15 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DltStartupTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DltStartupTests.java @@ -250,7 +250,7 @@ ConcurrentKafkaListenerContainerFactory cf2(ConsumerFactory cf(EmbeddedKafkaBroker broker) { - return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("dltStart", "false", broker)); + return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps(broker, "dltStart", false)); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/PartitionResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/PartitionResolverTests.java index 94c752fb01..5b04e735c4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/PartitionResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/PartitionResolverTests.java @@ -115,7 +115,7 @@ void listen(String in) { @Bean ConsumerFactory cf(EmbeddedKafkaBroker broker) { - Map props = KafkaTestUtils.consumerProps("prt", "false", broker); + Map props = KafkaTestUtils.consumerProps(broker, "prt", false); return new DefaultKafkaConsumerFactory<>(props); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java index 3abd651735..a9715839cf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java @@ -126,7 +126,7 @@ KafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaTemplate consumerFactory(EmbeddedKafkaBroker embeddedKafka) { return new DefaultKafkaConsumerFactory<>( - KafkaTestUtils.consumerProps("retryConfig", "false", embeddedKafka)); + KafkaTestUtils.consumerProps(embeddedKafka, "retryConfig", false)); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java index 87618b70cf..a1184bd69c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java @@ -124,7 +124,7 @@ KafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaTemplate consumerFactory(EmbeddedKafkaBroker embeddedKafka) { - Map props = KafkaTestUtils.consumerProps("retryConfig", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "retryConfig", false); props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); return new DefaultKafkaConsumerFactory<>( props); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsBranchTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsBranchTests.java index 6dcb606c38..2fbf61f6c5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsBranchTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsBranchTests.java @@ -112,7 +112,7 @@ public void testBranchingStream() { private Consumer createConsumer() { Map consumerProps = - KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "false", this.embeddedKafka); + KafkaTestUtils.consumerProps(this.embeddedKafka, UUID.randomUUID().toString(), false); consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000); DefaultKafkaConsumerFactory kafkaConsumerFactory = diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java index aab8c1bf07..cc4ec210df 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java @@ -226,8 +226,7 @@ public KafkaTemplate template() { @Bean public Map consumerConfigs() { - return KafkaTestUtils.consumerProps(this.brokerAddresses, "testGroup", - "false"); + return KafkaTestUtils.consumerProps(this.brokerAddresses, "testGroup", false); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsJsonSerializationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsJsonSerializationTests.java index 3e208f08dd..b70f4ef0e9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsJsonSerializationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsJsonSerializationTests.java @@ -117,7 +117,7 @@ public void testJsonObjectSerialization() { private Consumer consumer(String topic, Serde keySerde, Serde valueSerde) { Map consumerProps = - KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "false", this.embeddedKafka); + KafkaTestUtils.consumerProps(this.embeddedKafka, UUID.randomUUID().toString(), false); consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000); DefaultKafkaConsumerFactory kafkaConsumerFactory = diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java index 0a71ced184..dec47db072 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java @@ -243,9 +243,7 @@ public KStream kStream(StreamsBuilder kStreamBuilder) { @Bean public Map consumerConfigs() { - Map consumerProps = KafkaTestUtils.consumerProps(this.brokerAddresses, "testGroup", - "false"); - return consumerProps; + return KafkaTestUtils.consumerProps(this.brokerAddresses, "testGroup", false); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java index 0a0e7d9052..ddc331ad96 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java @@ -215,8 +215,7 @@ public KStream kStream(StreamsBuilder kStreamBuilder) { @Bean public Map consumerConfigs() { - Map consumerProps = KafkaTestUtils.consumerProps(this.brokerAddresses, "recovererGroup", - "false"); + Map consumerProps = KafkaTestUtils.consumerProps(this.brokerAddresses, "recovererGroup", false); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); return consumerProps; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java index 3da236b77e..77aa18b7ac 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java @@ -111,7 +111,7 @@ ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { @Bean ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { return new DefaultKafkaConsumerFactory<>( - KafkaTestUtils.consumerProps("metrics", "false", broker)); + KafkaTestUtils.consumerProps(broker, "metrics", false)); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java index 359b35d52a..137eab3b68 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java @@ -131,7 +131,7 @@ ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { @Bean ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { - Map consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker); + Map consumerProps = KafkaTestUtils.consumerProps(broker, "obs", false); return new DefaultKafkaConsumerFactory<>(consumerProps); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 78124e5bb0..00d1d0d21f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -570,7 +570,7 @@ ProducerFactory customProducerFactory(EmbeddedKafkaBroker broke @Bean ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { - Map consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker); + Map consumerProps = KafkaTestUtils.consumerProps(broker, "obs", false); consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," + broker.getBrokersAsString() + "," + broker.getBrokersAsString()); return new DefaultKafkaConsumerFactory<>(consumerProps); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java index 835bfa38f6..0a348d2627 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java @@ -48,7 +48,7 @@ public class SerializationIntegrationTests { @Test void configurePreLoadedDelegates() { Map consumerProps = - KafkaTestUtils.consumerProps(DBTD_TOPIC, "false", EmbeddedKafkaCondition.getBroker()); + KafkaTestUtils.consumerProps(EmbeddedKafkaCondition.getBroker(), DBTD_TOPIC, false); consumerProps.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG, DBTD_TOPIC + ":" + TestDeserializer.class.getName()); TestDeserializer testDeser = new TestDeserializer();