From 2903fce606289351a76ca488ddb8ec130db66809 Mon Sep 17 00:00:00 2001 From: Elliot Kennedy Date: Sun, 21 Jan 2018 15:57:10 +0000 Subject: [PATCH 1/3] Wait for partitions to be assigned when consuming from all embedded topics. --- .../kafka/test/rule/KafkaEmbedded.java | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java index 41fbc57284..b10db92ccb 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java @@ -370,23 +370,7 @@ public boolean isEmbedded() { * @throws Exception an exception. */ public void consumeFromAllEmbeddedTopics(Consumer consumer) throws Exception { - final CountDownLatch consumerLatch = new CountDownLatch(1); - consumer.subscribe(Arrays.asList(this.topics), new ConsumerRebalanceListener() { - - @Override - public void onPartitionsRevoked(Collection partitions) { - } - - @Override - public void onPartitionsAssigned(Collection partitions) { - consumerLatch.countDown(); - } - - }); - consumer.poll(0); // force assignment - assertThat(consumerLatch.await(30, TimeUnit.SECONDS)) - .as("Failed to be assigned partitions from the embedded topics") - .isTrue(); + consumeFromEmbeddedTopics(consumer, this.topics); } /** @@ -409,6 +393,7 @@ public void consumeFromEmbeddedTopics(Consumer consumer, String... topics) for (String topic : topics) { assertThat(this.topics).as("topic '" + topic + "' is not in embedded topic list").contains(topic); } + final CountDownLatch consumerLatch = new CountDownLatch(1); consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener() { @Override @@ -417,12 +402,17 @@ public void onPartitionsRevoked(Collection partitions) { @Override public void onPartitionsAssigned(Collection partitions) { + consumerLatch.countDown(); if (logger.isDebugEnabled()) { logger.debug("partitions assigned: " + partitions); } } }); + consumer.poll(0); // force assignment + assertThat(consumerLatch.await(30, TimeUnit.SECONDS)) + .as("Failed to be assigned partitions from the embedded topics") + .isTrue(); logger.debug("Subscription Initiated"); } From 0a3ea64fd36cbaf8b84d51b220f595cc9e58c7db Mon Sep 17 00:00:00 2001 From: Elliot Kennedy Date: Tue, 23 Jan 2018 00:57:01 +0000 Subject: [PATCH 2/3] GH-539 - Wait for partitions to be assigned when consuming from embedded topics - Add a test to consume from embedded topics. - Updated authors and copywrite. --- build.gradle | 2 + .../kafka/test/rule/KafkaEmbedded.java | 3 +- .../AddressableEmbeddedBrokerTests.java | 6 +- .../test/rule/KafkaEmbeddedStreamTests.java | 180 ++++++++++++++++++ 4 files changed, 187 insertions(+), 4 deletions(-) rename spring-kafka-test/src/test/java/org/springframework/kafka/test/{hamcrest => rule}/AddressableEmbeddedBrokerTests.java (92%) create mode 100644 spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/KafkaEmbeddedStreamTests.java diff --git a/build.gradle b/build.gradle index afdff4dcb8..8c331e55d7 100644 --- a/build.gradle +++ b/build.gradle @@ -211,6 +211,8 @@ project ('spring-kafka-test') { compile ("org.hamcrest:hamcrest-all:$hamcrestVersion", optional) compile ("org.assertj:assertj-core:$assertjVersion", optional) + + testCompile project(':spring-kafka') } } diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java index b10db92ccb..826dbe6076 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2017 the original author or authors. + * Copyright 2015-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,6 +72,7 @@ * @author Artem Bilan * @author Gary Russell * @author Kamill Sokol + * @author Elliot Kennedy */ public class KafkaEmbedded extends ExternalResource implements KafkaRule, InitializingBean, DisposableBean { diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/hamcrest/AddressableEmbeddedBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java similarity index 92% rename from spring-kafka-test/src/test/java/org/springframework/kafka/test/hamcrest/AddressableEmbeddedBrokerTests.java rename to spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java index 596a570995..894a3e8c4e 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/hamcrest/AddressableEmbeddedBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 the original author or authors. + * Copyright 2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.kafka.test.hamcrest; +package org.springframework.kafka.test.rule; import static org.assertj.core.api.Assertions.assertThat; @@ -29,12 +29,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.test.context.junit4.SpringRunner; /** * @author Gary Russell * @author Kamill Sokol + * @author Elliot Kennedy * @since 1.3 * */ diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/KafkaEmbeddedStreamTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/KafkaEmbeddedStreamTests.java new file mode 100644 index 0000000000..ec9578ed1e --- /dev/null +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/KafkaEmbeddedStreamTests.java @@ -0,0 +1,180 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.test.rule; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.Consumed; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.FactoryBean; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.StreamsBuilderFactoryBean; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Elliot Kennedy + */ +@RunWith(SpringRunner.class) +public class KafkaEmbeddedStreamTests { + + private static final String TRUE_TOPIC = "true-output-topic"; + private static final String FALSE_TOPIC = "false-output-topic"; + private static final String TRUE_FALSE_INPUT_TOPIC = "input-topic"; + + @ClassRule + public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, TRUE_FALSE_INPUT_TOPIC, FALSE_TOPIC, TRUE_TOPIC); + + private KafkaTemplate kafkaTemplate; + + @Before + public void setup() throws Exception { + kafkaTemplate = createKafkaTemplate(); + } + + @Test + public void testConsumeFromAnEmbeddedTopic() throws Exception { + Consumer falseConsumer = createConsumer(); + embeddedKafka.consumeFromAnEmbeddedTopic(falseConsumer, FALSE_TOPIC); + + Consumer trueConsumer = createConsumer(); + embeddedKafka.consumeFromAnEmbeddedTopic(trueConsumer, TRUE_TOPIC); + + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(false)); + + List trueMessages = consumeAll(trueConsumer); + List falseMessages = consumeAll(falseConsumer); + + assertThat(trueMessages).containsExactlyInAnyOrder("true", "true"); + assertThat(falseMessages).containsExactlyInAnyOrder("false"); + } + + @Test + public void testConsumeFromEmbeddedTopics() throws Exception { + Consumer trueAndFalseConsumer = createConsumer(); + embeddedKafka.consumeFromEmbeddedTopics(trueAndFalseConsumer, FALSE_TOPIC, TRUE_TOPIC); + + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(false)); + + List trueAndFalseMessages = consumeAll(trueAndFalseConsumer); + + assertThat(trueAndFalseMessages).containsExactlyInAnyOrder("true", "true", "false"); + } + + @Test + public void testConsumeFromAllEmbeddedTopics() throws Exception { + Consumer consumeFromAllTopicsConsumer = createConsumer(); + embeddedKafka.consumeFromAllEmbeddedTopics(consumeFromAllTopicsConsumer); + + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(false)); + + List allMessages = consumeAll(consumeFromAllTopicsConsumer); + + assertThat(allMessages).containsExactlyInAnyOrder("true", "false", "true", "false"); + } + + private List consumeAll(Consumer consumer) { + List allMessages = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + ConsumerRecords records = consumer.poll(10L); + records.forEach(record -> allMessages.add(record.value())); + } + return allMessages; + } + + private Consumer createConsumer() { + Map consumerProps = KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "false", embeddedKafka); + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000); + + DefaultKafkaConsumerFactory kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer()); + return kafkaConsumerFactory.createConsumer(); + } + + private KafkaTemplate createKafkaTemplate() { + Map senderProperties = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()); + senderProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + senderProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(senderProperties); + return new KafkaTemplate<>(producerFactory); + } + + @Configuration + @EnableKafka + public static class Config { + + @Value("${spring.embedded.kafka.brokers}") + private String brokers; + + @Bean + public FactoryBean streamsBuilder() { + Map props = new HashMap<>(); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0L); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + return new StreamsBuilderFactoryBean(new StreamsConfig(props)); + } + + @Bean + public KStream trueFalseStream(StreamsBuilder streamsBuilder) { + KStream trueFalseStream = streamsBuilder + .stream(TRUE_FALSE_INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); + + KStream[] branches = trueFalseStream.branch( + (key, value) -> String.valueOf(true).equals(value), + (key, value) -> String.valueOf(false).equals(value)); + + branches[0].to(TRUE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + branches[1].to(FALSE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + + return trueFalseStream; + } + } + +} From d661d97067e877503f7a3242c90ca43df9353b29 Mon Sep 17 00:00:00 2001 From: Elliot Kennedy Date: Tue, 23 Jan 2018 02:31:29 +0000 Subject: [PATCH 3/3] GH-539 - Wait for partitions to be assigned when consuming from embedded topics - Move test and use spring test utils better. --- build.gradle | 2 - .../test/rule/KafkaEmbeddedStreamTests.java | 180 ------------------ .../kstream/KafkaStreamsBranchTests.java | 179 +++++++++++++++++ 3 files changed, 179 insertions(+), 182 deletions(-) delete mode 100644 spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/KafkaEmbeddedStreamTests.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsBranchTests.java diff --git a/build.gradle b/build.gradle index 8c331e55d7..afdff4dcb8 100644 --- a/build.gradle +++ b/build.gradle @@ -211,8 +211,6 @@ project ('spring-kafka-test') { compile ("org.hamcrest:hamcrest-all:$hamcrestVersion", optional) compile ("org.assertj:assertj-core:$assertjVersion", optional) - - testCompile project(':spring-kafka') } } diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/KafkaEmbeddedStreamTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/KafkaEmbeddedStreamTests.java deleted file mode 100644 index ec9578ed1e..0000000000 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/KafkaEmbeddedStreamTests.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.kafka.test.rule; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.Consumed; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Produced; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.FactoryBean; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.core.StreamsBuilderFactoryBean; -import org.springframework.kafka.test.utils.KafkaTestUtils; -import org.springframework.test.context.junit4.SpringRunner; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * @author Elliot Kennedy - */ -@RunWith(SpringRunner.class) -public class KafkaEmbeddedStreamTests { - - private static final String TRUE_TOPIC = "true-output-topic"; - private static final String FALSE_TOPIC = "false-output-topic"; - private static final String TRUE_FALSE_INPUT_TOPIC = "input-topic"; - - @ClassRule - public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, TRUE_FALSE_INPUT_TOPIC, FALSE_TOPIC, TRUE_TOPIC); - - private KafkaTemplate kafkaTemplate; - - @Before - public void setup() throws Exception { - kafkaTemplate = createKafkaTemplate(); - } - - @Test - public void testConsumeFromAnEmbeddedTopic() throws Exception { - Consumer falseConsumer = createConsumer(); - embeddedKafka.consumeFromAnEmbeddedTopic(falseConsumer, FALSE_TOPIC); - - Consumer trueConsumer = createConsumer(); - embeddedKafka.consumeFromAnEmbeddedTopic(trueConsumer, TRUE_TOPIC); - - kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); - kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); - kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(false)); - - List trueMessages = consumeAll(trueConsumer); - List falseMessages = consumeAll(falseConsumer); - - assertThat(trueMessages).containsExactlyInAnyOrder("true", "true"); - assertThat(falseMessages).containsExactlyInAnyOrder("false"); - } - - @Test - public void testConsumeFromEmbeddedTopics() throws Exception { - Consumer trueAndFalseConsumer = createConsumer(); - embeddedKafka.consumeFromEmbeddedTopics(trueAndFalseConsumer, FALSE_TOPIC, TRUE_TOPIC); - - kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); - kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); - kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(false)); - - List trueAndFalseMessages = consumeAll(trueAndFalseConsumer); - - assertThat(trueAndFalseMessages).containsExactlyInAnyOrder("true", "true", "false"); - } - - @Test - public void testConsumeFromAllEmbeddedTopics() throws Exception { - Consumer consumeFromAllTopicsConsumer = createConsumer(); - embeddedKafka.consumeFromAllEmbeddedTopics(consumeFromAllTopicsConsumer); - - kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); - kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(false)); - - List allMessages = consumeAll(consumeFromAllTopicsConsumer); - - assertThat(allMessages).containsExactlyInAnyOrder("true", "false", "true", "false"); - } - - private List consumeAll(Consumer consumer) { - List allMessages = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - ConsumerRecords records = consumer.poll(10L); - records.forEach(record -> allMessages.add(record.value())); - } - return allMessages; - } - - private Consumer createConsumer() { - Map consumerProps = KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "false", embeddedKafka); - consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000); - - DefaultKafkaConsumerFactory kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer()); - return kafkaConsumerFactory.createConsumer(); - } - - private KafkaTemplate createKafkaTemplate() { - Map senderProperties = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()); - senderProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - senderProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(senderProperties); - return new KafkaTemplate<>(producerFactory); - } - - @Configuration - @EnableKafka - public static class Config { - - @Value("${spring.embedded.kafka.brokers}") - private String brokers; - - @Bean - public FactoryBean streamsBuilder() { - Map props = new HashMap<>(); - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0L); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); - return new StreamsBuilderFactoryBean(new StreamsConfig(props)); - } - - @Bean - public KStream trueFalseStream(StreamsBuilder streamsBuilder) { - KStream trueFalseStream = streamsBuilder - .stream(TRUE_FALSE_INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); - - KStream[] branches = trueFalseStream.branch( - (key, value) -> String.valueOf(true).equals(value), - (key, value) -> String.valueOf(false).equals(value)); - - branches[0].to(TRUE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); - branches[1].to(FALSE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); - - return trueFalseStream; - } - } - -} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsBranchTests.java b/spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsBranchTests.java new file mode 100644 index 0000000000..8605b7ee13 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsBranchTests.java @@ -0,0 +1,179 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.kstream; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.Consumed; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.rule.KafkaEmbedded; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * @author Elliot Kennedy + */ +@RunWith(SpringRunner.class) +@DirtiesContext +@EmbeddedKafka(partitions = 1, + topics = { + KafkaStreamsBranchTests.TRUE_TOPIC, + KafkaStreamsBranchTests.FALSE_TOPIC, + KafkaStreamsBranchTests.TRUE_FALSE_INPUT_TOPIC }, + brokerProperties = { + "auto.create.topics.enable=${topics.autoCreate:false}", + "delete.topic.enable=${topic.delete:true}"}, + brokerPropertiesLocation = "classpath:/${broker.filename:broker}.properties") +public class KafkaStreamsBranchTests { + + public static final String TRUE_TOPIC = "true-output-topic"; + public static final String FALSE_TOPIC = "false-output-topic"; + public static final String TRUE_FALSE_INPUT_TOPIC = "input-topic"; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private KafkaEmbedded kafkaEmbedded; + + @Test + public void testBranchingStream() throws Exception { + Consumer falseConsumer = createConsumer(); + kafkaEmbedded.consumeFromAnEmbeddedTopic(falseConsumer, FALSE_TOPIC); + + Consumer trueConsumer = createConsumer(); + kafkaEmbedded.consumeFromAnEmbeddedTopic(trueConsumer, TRUE_TOPIC); + + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(false)); + + ConsumerRecords trueRecords = KafkaTestUtils.getRecords(trueConsumer); + ConsumerRecords falseRecords = KafkaTestUtils.getRecords(falseConsumer); + + List trueValues = new ArrayList<>(); + trueRecords.forEach(trueRecord -> trueValues.add(trueRecord.value())); + + List falseValues = new ArrayList<>(); + falseRecords.forEach(falseRecord -> falseValues.add(falseRecord.value())); + + assertThat(trueValues).containsExactly("true", "true"); + assertThat(falseValues).containsExactly("false"); + } + + private Consumer createConsumer() { + Map consumerProps = KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "false", kafkaEmbedded); + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000); + + DefaultKafkaConsumerFactory kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer()); + return kafkaConsumerFactory.createConsumer(); + } + + @Configuration + @EnableKafka + @EnableKafkaStreams + public static class Config { + + @Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + "}") + private String brokerAddresses; + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public Map producerConfigs() { + return KafkaTestUtils.senderProps(this.brokerAddresses); + } + + @Bean + public KafkaTemplate template() { + KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory(), true); + kafkaTemplate.setDefaultTopic(TRUE_FALSE_INPUT_TOPIC); + return kafkaTemplate; + } + + @Bean + public KafkaTemplate kafkaTemplate() { + Map senderProperties = KafkaTestUtils.senderProps(this.brokerAddresses); + senderProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + senderProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(senderProperties); + return new KafkaTemplate<>(producerFactory); + } + + @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + public StreamsConfig kStreamsConfigs() { + Map props = new HashMap<>(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + return new StreamsConfig(props); + } + + @Bean + @SuppressWarnings("unchecked") + public KStream trueFalseStream(StreamsBuilder streamsBuilder) { + KStream trueFalseStream = streamsBuilder + .stream(TRUE_FALSE_INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); + + KStream[] branches = trueFalseStream.branch((key, value) -> String.valueOf(true).equals(value), (key, value) -> String.valueOf(false).equals(value)); + + branches[0].to(TRUE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + branches[1].to(FALSE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + + return trueFalseStream; + } + + } + +}