Skip to content

Commit 94d4fce

Browse files
zacholausonartembilan
authored andcommitted
GH-555: Resolve PPs in EmbeddedKafka topics
Fixes #555 * Update `EmbeddedKafkaContextCustomizer` to resolve property placeholders * Update `KafkaStreamsTests` to use topic from property * Add property resolving details to `EmbeddedKafka` javadoc and testing.adoc * Simple polishing
1 parent fc6ae37 commit 94d4fce

File tree

4 files changed

+37
-18
lines changed

4 files changed

+37
-18
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
5151
*
5252
* @author Artem Bilan
5353
* @author Elliot Metsger
54+
* @author Zach Olauson
5455
*
5556
* @since 1.3
5657
*
@@ -85,6 +86,8 @@
8586
int partitions() default 2;
8687

8788
/**
89+
* Topics that should be created
90+
* Topics may contain property placeholders, e.g. {@code topics = "${kafka.topic.one:topicOne}"}
8891
* @return the topics to create
8992
*/
9093
String[] topics() default { };

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.io.InputStream;
2121
import java.io.StringReader;
22+
import java.util.Arrays;
2223
import java.util.Map;
2324
import java.util.Properties;
2425

@@ -40,6 +41,7 @@
4041
*
4142
* @author Artem Bilan
4243
* @author Elliot Metsger
44+
* @author Zach Olauson
4345
*
4446
* @since 1.3
4547
*/
@@ -57,13 +59,19 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
5759
ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
5860
Assert.isInstanceOf(DefaultSingletonBeanRegistry.class, beanFactory);
5961

62+
ConfigurableEnvironment environment = context.getEnvironment();
63+
64+
String[] topics =
65+
Arrays.stream(this.embeddedKafka.topics())
66+
.map(environment::resolvePlaceholders)
67+
.toArray(String[]::new);
68+
6069
KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(this.embeddedKafka.count(),
6170
this.embeddedKafka.controlledShutdown(),
6271
this.embeddedKafka.partitions(),
63-
this.embeddedKafka.topics());
72+
topics);
6473

6574
Properties properties = new Properties();
66-
ConfigurableEnvironment environment = context.getEnvironment();
6775

6876
for (String pair : this.embeddedKafka.brokerProperties()) {
6977
if (!StringUtils.hasText(pair)) {

spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsTests.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.TimeUnit;
2727

2828
import org.apache.kafka.clients.consumer.ConsumerConfig;
29+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2930
import org.apache.kafka.common.serialization.Serdes;
3031
import org.apache.kafka.streams.KafkaStreams;
3132
import org.apache.kafka.streams.KeyValue;
@@ -63,6 +64,7 @@
6364
import org.springframework.kafka.test.rule.KafkaEmbedded;
6465
import org.springframework.kafka.test.utils.KafkaTestUtils;
6566
import org.springframework.test.annotation.DirtiesContext;
67+
import org.springframework.test.context.TestPropertySource;
6668
import org.springframework.test.context.junit4.SpringRunner;
6769
import org.springframework.util.concurrent.SettableListenableFuture;
6870

@@ -71,15 +73,17 @@
7173
* @author Marius Bogoevici
7274
* @author Gary Russell
7375
* @author Elliot Metsger
76+
* @author Zach Olauson
7477
*
7578
* @since 1.1.4
7679
*/
7780
@RunWith(SpringRunner.class)
7881
@DirtiesContext
82+
@TestPropertySource(properties = "streaming.topic.two=streamingTopic2")
7983
@EmbeddedKafka(partitions = 1,
8084
topics = {
8185
KafkaStreamsTests.STREAMING_TOPIC1,
82-
KafkaStreamsTests.STREAMING_TOPIC2,
86+
"${streaming.topic.two}",
8387
KafkaStreamsTests.FOOS },
8488
brokerProperties = {
8589
"auto.create.topics.enable=${topics.autoCreate:false}",
@@ -89,22 +93,22 @@ public class KafkaStreamsTests {
8993

9094
static final String STREAMING_TOPIC1 = "streamingTopic1";
9195

92-
static final String STREAMING_TOPIC2 = "streamingTopic2";
93-
9496
static final String FOOS = "foos";
9597

9698
@Autowired
9799
private KafkaTemplate<Integer, String> kafkaTemplate;
98100

99101
@Autowired
100-
private SettableListenableFuture<String> resultFuture;
102+
private SettableListenableFuture<ConsumerRecord<?, String>> resultFuture;
101103

102104
@Autowired
103105
private StreamsBuilderFactoryBean streamsBuilderFactoryBean;
104106

105107
@Autowired
106108
private KafkaEmbedded kafkaEmbedded;
107109

110+
@Value("${streaming.topic.two}")
111+
private String streamingTopic2;
108112

109113
@Test
110114
public void testKStreams() throws Exception {
@@ -129,11 +133,12 @@ public void testKStreams() throws Exception {
129133
this.kafkaTemplate.sendDefault(0, payload2);
130134
this.kafkaTemplate.flush();
131135

132-
String result = resultFuture.get(600, TimeUnit.SECONDS);
136+
ConsumerRecord<?, String> result = resultFuture.get(600, TimeUnit.SECONDS);
133137

134138
assertThat(result).isNotNull();
135139

136-
assertThat(result).isEqualTo(payload.toUpperCase() + payload2.toUpperCase());
140+
assertThat(result.topic()).isEqualTo(streamingTopic2);
141+
assertThat(result.value()).isEqualTo(payload.toUpperCase() + payload2.toUpperCase());
137142

138143
assertThat(stateLatch.await(10, TimeUnit.SECONDS)).isTrue();
139144

@@ -152,6 +157,9 @@ public static class KafkaStreamsConfiguration {
152157
@Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
153158
private String brokerAddresses;
154159

160+
@Value("${streaming.topic.two}")
161+
private String streamingTopic2;
162+
155163
@Bean
156164
public ProducerFactory<Integer, String> producerFactory() {
157165
return new DefaultKafkaProducerFactory<>(producerConfigs());
@@ -196,7 +204,7 @@ public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
196204
.reduce((value1, value2) -> value1 + value2, Materialized.as("windowStore"))
197205
.toStream()
198206
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
199-
.filter((i, s) -> s.length() > 40).to(STREAMING_TOPIC2);
207+
.filter((i, s) -> s.length() > 40).to(streamingTopic2);
200208

201209
stream.print(Printed.toSysOut());
202210

@@ -224,12 +232,12 @@ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,
224232
}
225233

226234
@Bean
227-
public SettableListenableFuture<String> resultFuture() {
235+
public SettableListenableFuture<ConsumerRecord<?, String>> resultFuture() {
228236
return new SettableListenableFuture<>();
229237
}
230238

231-
@KafkaListener(topics = STREAMING_TOPIC2)
232-
public void listener(String payload) {
239+
@KafkaListener(topics = "${streaming.topic.two}")
240+
public void listener(ConsumerRecord<?, String> payload) {
233241
resultFuture().set(payload);
234242
}
235243

src/reference/asciidoc/testing.adoc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,17 +150,17 @@ public class KafkaStreamsTests {
150150
}
151151
----
152152

153-
The `brokerProperties` and `brokerPropertiesLocation` attributes of `@EmbeddedKafka` support property placeholder resolutions:
153+
The `topics`, `brokerProperties`, and `brokerPropertiesLocation` attributes of `@EmbeddedKafka` support property placeholder resolutions:
154154
[source, java]
155155
----
156156
@TestPropertySource(locations = "classpath:/test.properties")
157-
@EmbeddedKafka(topics = "any-topic",
157+
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
158158
brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
159159
"listeners=PLAINTEXT://localhost:${kafka.broker.port}",
160160
"auto.create.topics.enable=${kafka.broker.topics-enable:true}" }
161161
brokerPropertiesLocation = "classpath:/broker.properties")
162162
----
163-
In the example above, the property placeholders `${kafka.broker.logs-dir}` and `${kafka.broker.port}` are resolved from the Spring `Environment`.
163+
In the example above, the property placeholders `${kafka.topics.another-topic}`, `${kafka.broker.logs-dir}`, and `${kafka.broker.port}` are resolved from the Spring `Environment`.
164164
In addition the broker properties are loaded from the `broker.properties` classpath resource specified by the `brokerPropertiesLocation`.
165165
Property placeholders are resolved for the `brokerPropertiesLocation` URL and for any property placeholders found in the resource.
166166
Properties defined by `brokerProperties` override properties found in `brokerPropertiesLocation`.

0 commit comments

Comments
 (0)