diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java index 41fbc57284..826dbe6076 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2017 the original author or authors. + * Copyright 2015-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,6 +72,7 @@ * @author Artem Bilan * @author Gary Russell * @author Kamill Sokol + * @author Elliot Kennedy */ public class KafkaEmbedded extends ExternalResource implements KafkaRule, InitializingBean, DisposableBean { @@ -370,23 +371,7 @@ public boolean isEmbedded() { * @throws Exception an exception. */ public void consumeFromAllEmbeddedTopics(Consumer consumer) throws Exception { - final CountDownLatch consumerLatch = new CountDownLatch(1); - consumer.subscribe(Arrays.asList(this.topics), new ConsumerRebalanceListener() { - - @Override - public void onPartitionsRevoked(Collection partitions) { - } - - @Override - public void onPartitionsAssigned(Collection partitions) { - consumerLatch.countDown(); - } - - }); - consumer.poll(0); // force assignment - assertThat(consumerLatch.await(30, TimeUnit.SECONDS)) - .as("Failed to be assigned partitions from the embedded topics") - .isTrue(); + consumeFromEmbeddedTopics(consumer, this.topics); } /** @@ -409,6 +394,7 @@ public void consumeFromEmbeddedTopics(Consumer consumer, String... topics) for (String topic : topics) { assertThat(this.topics).as("topic '" + topic + "' is not in embedded topic list").contains(topic); } + final CountDownLatch consumerLatch = new CountDownLatch(1); consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener() { @Override @@ -417,12 +403,17 @@ public void onPartitionsRevoked(Collection partitions) { @Override public void onPartitionsAssigned(Collection partitions) { + consumerLatch.countDown(); if (logger.isDebugEnabled()) { logger.debug("partitions assigned: " + partitions); } } }); + consumer.poll(0); // force assignment + assertThat(consumerLatch.await(30, TimeUnit.SECONDS)) + .as("Failed to be assigned partitions from the embedded topics") + .isTrue(); logger.debug("Subscription Initiated"); } diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/hamcrest/AddressableEmbeddedBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java similarity index 92% rename from spring-kafka-test/src/test/java/org/springframework/kafka/test/hamcrest/AddressableEmbeddedBrokerTests.java rename to spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java index 596a570995..894a3e8c4e 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/hamcrest/AddressableEmbeddedBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 the original author or authors. + * Copyright 2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.kafka.test.hamcrest; +package org.springframework.kafka.test.rule; import static org.assertj.core.api.Assertions.assertThat; @@ -29,12 +29,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.test.context.junit4.SpringRunner; /** * @author Gary Russell * @author Kamill Sokol + * @author Elliot Kennedy * @since 1.3 * */ diff --git a/spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsBranchTests.java b/spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsBranchTests.java new file mode 100644 index 0000000000..8605b7ee13 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsBranchTests.java @@ -0,0 +1,179 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.kstream; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.Consumed; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.rule.KafkaEmbedded; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * @author Elliot Kennedy + */ +@RunWith(SpringRunner.class) +@DirtiesContext +@EmbeddedKafka(partitions = 1, + topics = { + KafkaStreamsBranchTests.TRUE_TOPIC, + KafkaStreamsBranchTests.FALSE_TOPIC, + KafkaStreamsBranchTests.TRUE_FALSE_INPUT_TOPIC }, + brokerProperties = { + "auto.create.topics.enable=${topics.autoCreate:false}", + "delete.topic.enable=${topic.delete:true}"}, + brokerPropertiesLocation = "classpath:/${broker.filename:broker}.properties") +public class KafkaStreamsBranchTests { + + public static final String TRUE_TOPIC = "true-output-topic"; + public static final String FALSE_TOPIC = "false-output-topic"; + public static final String TRUE_FALSE_INPUT_TOPIC = "input-topic"; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private KafkaEmbedded kafkaEmbedded; + + @Test + public void testBranchingStream() throws Exception { + Consumer falseConsumer = createConsumer(); + kafkaEmbedded.consumeFromAnEmbeddedTopic(falseConsumer, FALSE_TOPIC); + + Consumer trueConsumer = createConsumer(); + kafkaEmbedded.consumeFromAnEmbeddedTopic(trueConsumer, TRUE_TOPIC); + + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(true)); + kafkaTemplate.send(TRUE_FALSE_INPUT_TOPIC, UUID.randomUUID().toString(), String.valueOf(false)); + + ConsumerRecords trueRecords = KafkaTestUtils.getRecords(trueConsumer); + ConsumerRecords falseRecords = KafkaTestUtils.getRecords(falseConsumer); + + List trueValues = new ArrayList<>(); + trueRecords.forEach(trueRecord -> trueValues.add(trueRecord.value())); + + List falseValues = new ArrayList<>(); + falseRecords.forEach(falseRecord -> falseValues.add(falseRecord.value())); + + assertThat(trueValues).containsExactly("true", "true"); + assertThat(falseValues).containsExactly("false"); + } + + private Consumer createConsumer() { + Map consumerProps = KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "false", kafkaEmbedded); + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000); + + DefaultKafkaConsumerFactory kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer()); + return kafkaConsumerFactory.createConsumer(); + } + + @Configuration + @EnableKafka + @EnableKafkaStreams + public static class Config { + + @Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + "}") + private String brokerAddresses; + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public Map producerConfigs() { + return KafkaTestUtils.senderProps(this.brokerAddresses); + } + + @Bean + public KafkaTemplate template() { + KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory(), true); + kafkaTemplate.setDefaultTopic(TRUE_FALSE_INPUT_TOPIC); + return kafkaTemplate; + } + + @Bean + public KafkaTemplate kafkaTemplate() { + Map senderProperties = KafkaTestUtils.senderProps(this.brokerAddresses); + senderProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + senderProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(senderProperties); + return new KafkaTemplate<>(producerFactory); + } + + @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + public StreamsConfig kStreamsConfigs() { + Map props = new HashMap<>(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + return new StreamsConfig(props); + } + + @Bean + @SuppressWarnings("unchecked") + public KStream trueFalseStream(StreamsBuilder streamsBuilder) { + KStream trueFalseStream = streamsBuilder + .stream(TRUE_FALSE_INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); + + KStream[] branches = trueFalseStream.branch((key, value) -> String.valueOf(true).equals(value), (key, value) -> String.valueOf(false).equals(value)); + + branches[0].to(TRUE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + branches[1].to(FALSE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + + return trueFalseStream; + } + + } + +}