Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* @author Hugo Wood
* @author Artem Bilan
* @author Sanghyeok An
* @author Mikhail Polivakha
*/
public final class KafkaTestUtils {

Expand All @@ -81,13 +82,30 @@ private KafkaTestUtils() {
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
* @deprecated please, use {@link #consumerProps(EmbeddedKafkaBroker, String, boolean)} instead
*/
@Deprecated(forRemoval = true, since = "4.0.0")
public static Map<String, Object> consumerProps(String group, String autoCommit,
EmbeddedKafkaBroker embeddedKafka) {

return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
}

/**
* Set up test properties for an {@code <Integer, String>} consumer.
*
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
* @since 4.0
*/
public static Map<String, Object> consumerProps(EmbeddedKafkaBroker embeddedKafka, String group,
boolean autoCommit) {

return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
}

/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param brokers the bootstrapServers property.
Expand All @@ -96,7 +114,7 @@ public static Map<String, Object> consumerProps(String group, String autoCommit,
* @since 3.3
*/
public static Map<String, Object> consumerProps(String brokers, String group) {
return consumerProps(brokers, group, "false");
return consumerProps(brokers, group, false);
}

/**
Expand All @@ -114,8 +132,22 @@ public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafk
* @param group the group id.
* @param autoCommit the auto commit.
* @return the properties.
*/
* @deprecated Please, use {@link #consumerProps(String, String, boolean)} instead.
*/
@Deprecated(forRemoval = true, since = "4.0.0")
public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit) {
return consumerProps(brokers, group, Boolean.parseBoolean(autoCommit));
}

/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param brokers the bootstrapServers property.
* @param group the group id.
* @param autoCommit the auto commit.
* @return the properties.
* @since 4.0
*/
public static Map<String, Object> consumerProps(String brokers, String group, boolean autoCommit) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
Expand Down Expand Up @@ -235,7 +267,7 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,
boolean seekToLast, boolean commit, Duration timeout) {

Map<String, Object> consumerConfig = consumerProps(brokerAddresses, group, "false");
Map<String, Object> consumerConfig = consumerProps(brokerAddresses, group, false);
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
try (KafkaConsumer consumer = new KafkaConsumer(consumerConfig)) {
TopicPartition topicPart = new TopicPartition(topic, partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testKafkaEmbedded() {

@Test
public void testLateStartedConsumer() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(TEST_EMBEDDED, "false", this.broker);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.broker, TEST_EMBEDDED, false);
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
this.broker.consumeFromAnEmbeddedTopic(consumer, TEST_EMBEDDED);

Expand All @@ -78,7 +78,7 @@ public void testLateStartedConsumer() {
producer.close();
KafkaTestUtils.getSingleRecord(consumer, TEST_EMBEDDED);

consumerProps = KafkaTestUtils.consumerProps("another" + TEST_EMBEDDED, "false", this.broker);
consumerProps = KafkaTestUtils.consumerProps(this.broker, "another" + TEST_EMBEDDED, false);
Consumer<Integer, String> consumer2 = new KafkaConsumer<>(consumerProps);
this.broker.consumeFromAnEmbeddedTopic(consumer2, TEST_EMBEDDED);
KafkaTestUtils.getSingleRecord(consumer2, TEST_EMBEDDED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void testConsumeFromEmbeddedWithSeekToEnd() {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(kafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd"));
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(kafka, "seekTest", false);
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic");
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void testGetSingleWithMoreThanOneTopic(EmbeddedKafkaBroker broker) {
producer.send(new ProducerRecord<>("singleTopic1", 0, 1, "foo"));
producer.send(new ProducerRecord<>("singleTopic2", 0, 1, "foo"));
producer.close();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("ktuTests1", "false", broker);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(broker, "ktuTests1", false);
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
broker.consumeFromAllEmbeddedTopics(consumer);
KafkaTestUtils.getSingleRecord(consumer, "singleTopic1");
Expand All @@ -72,7 +72,7 @@ void testGetSingleWithMoreThanOneTopicRecordNotThereYet(EmbeddedKafkaBroker brok
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
producer.send(new ProducerRecord<>("singleTopic4", 0, 1, "foo"));
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("ktuTests2", "false", broker);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(broker, "ktuTests2", false);
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
broker.consumeFromEmbeddedTopics(consumer, "singleTopic4", "singleTopic5");
long t1 = System.currentTimeMillis();
Expand Down Expand Up @@ -117,7 +117,7 @@ public void testMultiMinRecords(EmbeddedKafkaBroker broker) {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
producer.send(new ProducerRecord<>("multiTopic1", 0, 1, "foo"));
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("ktuTests3", "false", broker);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(broker, "ktuTests3", false);
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
broker.consumeFromAnEmbeddedTopic(consumer, "multiTopic1");
new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,7 @@ public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps("myAliasGroup", "false", embeddedKafka());
return consumerProps;
return KafkaTestUtils.consumerProps(embeddedKafka(), "myAliasGroup", false);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public DefaultKafkaConsumerFactory<String, String> consumerFactory(

@Bean
public Map<String, Object> consumerConfigs(EmbeddedKafkaBroker embeddedKafka) {
return KafkaTestUtils.consumerProps("test", "false", embeddedKafka);
return KafkaTestUtils.consumerProps(embeddedKafka, "test", false);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public DefaultKafkaConsumerFactory<Integer, Foo> consumerFactory() {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", this.embeddedKafka);
KafkaTestUtils.consumerProps(this.embeddedKafka, DEFAULT_TEST_GROUP_ID, false);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public DefaultKafkaConsumerFactory<Integer, Foo> consumerFactory(EmbeddedKafkaBr
@Bean
public Map<String, Object> consumerConfigs(EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka);
KafkaTestUtils.consumerProps(embeddedKafka, DEFAULT_TEST_GROUP_ID, false);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1000);
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ private ConsumerFactory<Integer, String> configuredConsumerFactory(String client

@Bean
public Map<String, Object> consumerConfigs() {
return KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", this.embeddedKafka);
return KafkaTestUtils.consumerProps(this.embeddedKafka, DEFAULT_TEST_GROUP_ID, false);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void testNestedTxProducerIsCached() throws Exception {
DefaultKafkaProducerFactory<Integer, String> pfTx = new DefaultKafkaProducerFactory<>(producerProps);
pfTx.setTransactionIdPrefix("fooTx.");
KafkaOperations<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.embeddedKafka, "txCache1Group", false);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
AtomicReference<Consumer<Integer, String>> wrapped = new AtomicReference<>();
cf.addPostProcessor(consumer -> {
Expand Down Expand Up @@ -418,7 +418,7 @@ public void testNestedTxProducerIsFixed() throws Exception {
TransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy(3);
pfTx.setTransactionIdSuffixStrategy(suffixStrategy);
KafkaOperations<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache1FixedGroup", "false", this.embeddedKafka);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.embeddedKafka, "txCache1FixedGroup", false);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
AtomicReference<Consumer<Integer, String>> wrapped = new AtomicReference<>();
cf.addPostProcessor(consumer -> {
Expand Down Expand Up @@ -464,7 +464,7 @@ public void testNestedTxProducerIsFixed() throws Exception {
@ParameterizedTest
@ValueSource(booleans = { true, false })
void listener(boolean closeWithTimeout) {
Map<String, Object> consumerConfig = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
Map<String, Object> consumerConfig = KafkaTestUtils.consumerProps(this.embeddedKafka, "txCache1Group", false);
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "foo-0");
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(consumerConfig);
List<String> adds = new ArrayList<>();
Expand Down Expand Up @@ -503,7 +503,7 @@ public void consumerRemoved(String id, Consumer consumer) {
void configDeserializer() {
Deserializer key = mock(Deserializer.class);
Deserializer value = mock(Deserializer.class);
Map<String, Object> config = KafkaTestUtils.consumerProps("mockGroup", "false", this.embeddedKafka);
Map<String, Object> config = KafkaTestUtils.consumerProps(this.embeddedKafka, "mockGroup", false);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(config, key, value);
Deserializer keyDeserializer = cf.getKeyDeserializer();
assertThat(keyDeserializer).isSameAs(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void producerRemoved(String id, Producer<String, String> producer) {
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("KafkaTemplatetests" + UUID.randomUUID(), "false", embeddedKafka);
.consumerProps(embeddedKafka, "KafkaTemplatetests" + UUID.randomUUID(), false);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, INT_KEY_TOPIC);
Expand Down Expand Up @@ -165,7 +165,7 @@ void testTemplate() {
template.setDefaultTopic(INT_KEY_TOPIC);

template.setConsumerFactory(
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps(embeddedKafka, "xx", false)));
ConsumerRecords<Integer, String> initialRecords =
template.receive(Collections.singleton(new TopicPartitionOffset(INT_KEY_TOPIC, 1, 1L)));
assertThat(initialRecords).isEmpty();
Expand Down Expand Up @@ -475,7 +475,7 @@ void testTemplateDisambiguation() {
pf.setKeySerializer(new StringSerializer());
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic(STRING_KEY_TOPIC);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTString", "false", embeddedKafka);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testTString", false);
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
cf.setKeyDeserializer(new StringDeserializer());
Consumer<String, String> localConsumer = cf.createConsumer();
Expand Down Expand Up @@ -630,7 +630,7 @@ void testReceiveWhenOffsetIsInvalid(Long offset) {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);

template.setConsumerFactory(
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps(embeddedKafka, "xx", false)));
TopicPartitionOffset tpoWithNullOffset = new TopicPartitionOffset(INT_KEY_TOPIC, 1, offset);

assertThatExceptionOfType(KafkaException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testLocalTransaction() {
pf.setKeySerializer(new StringSerializer());
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testLocalTx", "false", embeddedKafka);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testLocalTx", false);
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
cf.setKeyDeserializer(new StringDeserializer());
Expand Down Expand Up @@ -173,7 +173,7 @@ public void testLocalTransactionIsFixed() {
pf.setTransactionIdSuffixStrategy(suffixStrategy);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testLocalTxFixed", "false", embeddedKafka);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testLocalTxFixed", false);
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
cf.setKeyDeserializer(new StringDeserializer());
Expand Down Expand Up @@ -232,7 +232,7 @@ public void testGlobalTransaction() {
pf.setTransactionIdPrefix("my.transaction.");
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGlobalTx", "false", embeddedKafka);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testGlobalTx", false);
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
cf.setKeyDeserializer(new StringDeserializer());
Consumer<String, String> consumer = cf.createConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class ReactiveKafkaProducerTemplateIntegrationTests {
@BeforeAll
public static void setUpBeforeClass() {
Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("reactive_consumer_group", "false", EmbeddedKafkaCondition.getBroker());
.consumerProps(EmbeddedKafkaCondition.getBroker(), "reactive_consumer_group", false);
reactiveKafkaConsumerTemplate =
new ReactiveKafkaConsumerTemplate<>(setupReceiverOptionsWithDefaultTopic(consumerProps));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class ReactiveKafkaProducerTemplateTransactionIntegrationTests {
@BeforeAll
public static void setUpBeforeClass() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(CONSUMER_GROUP_ID, "false", EmbeddedKafkaCondition.getBroker());
KafkaTestUtils.consumerProps(EmbeddedKafkaCondition.getBroker(), CONSUMER_GROUP_ID, false);
reactiveKafkaConsumerTemplate =
new ReactiveKafkaConsumerTemplate<>(setupReceiverOptionsWithDefaultTopic(consumerProps));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFa

@Bean
ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("test-group", "false", this.broker));
return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps(this.broker, "test-group", false));
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerCon

@Bean
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> props = KafkaTestUtils.consumerProps("asaac.grp", "false", broker);
Map<String, Object> props = KafkaTestUtils.consumerProps(broker, "asaac.grp", false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
return new DefaultKafkaConsumerFactory<>(
props);
Expand Down
Loading