Skip to content

Commit 9307dcd

Browse files
nklmishartembilan
authored andcommitted
GH-676: Allow to add topics into KafkaEmbedded
Fixes #676 (cherry picked from commit 1fa700a)
1 parent 550fc42 commit 9307dcd

File tree

2 files changed

+40
-12
lines changed

2 files changed

+40
-12
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
import java.util.Collection;
2727
import java.util.Collections;
2828
import java.util.HashMap;
29+
import java.util.HashSet;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.util.Properties;
33+
import java.util.Set;
3234
import java.util.concurrent.CountDownLatch;
3335
import java.util.concurrent.TimeUnit;
3436
import java.util.stream.Collectors;
@@ -77,6 +79,7 @@
7779
* @author Gary Russell
7880
* @author Kamill Sokol
7981
* @author Elliot Kennedy
82+
* @author Nakul Mishra
8083
*/
8184
public class KafkaEmbedded extends ExternalResource implements KafkaRule, InitializingBean, DisposableBean {
8285

@@ -119,7 +122,7 @@ public class KafkaEmbedded extends ExternalResource implements KafkaRule, Initia
119122

120123
private final boolean controlledShutdown;
121124

122-
private final String[] topics;
125+
private final Set<String> topics;
123126

124127
private final int partitionsPerTopic;
125128

@@ -161,10 +164,10 @@ public KafkaEmbedded(int count, boolean controlledShutdown, int partitions, Stri
161164
this.kafkaPorts = new int[this.count]; // random ports by default.
162165
this.controlledShutdown = controlledShutdown;
163166
if (topics != null) {
164-
this.topics = topics;
167+
this.topics = new HashSet<>(Arrays.asList(topics));
165168
}
166169
else {
167-
this.topics = new String[0];
170+
this.topics = new HashSet<>();
168171
}
169172
this.partitionsPerTopic = partitions;
170173
}
@@ -234,19 +237,18 @@ public void before() throws Exception { //NOSONAR
234237
this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT);
235238
}
236239
}
237-
addTopics(this.topics);
240+
createKafkaTopics(this.topics);
238241
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
239242
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
240243
}
241244

242245
/**
243-
* Add topics to the existing broker(s) using the configured number of partitions.
246+
* Create topics in the existing broker(s) using the configured number of partitions.
244247
* @param topics the topics.
245-
* @since 2.1
246248
*/
247-
public void addTopics(String... topics) {
249+
private void createKafkaTopics(Set<String> topics) {
248250
doWithAdmin(admin -> {
249-
List<NewTopic> newTopics = Arrays.stream(topics)
251+
List<NewTopic> newTopics = topics.stream()
250252
.map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count))
251253
.collect(Collectors.toList());
252254
CreateTopicsResult createTopics = admin.createTopics(newTopics);
@@ -259,6 +261,18 @@ public void addTopics(String... topics) {
259261
});
260262
}
261263

264+
265+
/**
266+
* Add topics to the existing broker(s) using the configured number of partitions.
267+
* @param topics the topics.
268+
* @since 2.1
269+
*/
270+
public void addTopics(String... topics) {
271+
HashSet<String> set = new HashSet<>(Arrays.asList(topics));
272+
createKafkaTopics(set);
273+
this.topics.addAll(set);
274+
}
275+
262276
/**
263277
* Create an {@link AdminClient} invoke the callback and reliable close the
264278
* admin.
@@ -339,6 +353,10 @@ public void after() {
339353
}
340354
}
341355

356+
public Set<String> getTopics() {
357+
return new HashSet<>(this.topics);
358+
}
359+
342360
@Override
343361
public List<KafkaServer> getKafkaServers() {
344362
return this.kafkaServers;
@@ -456,7 +474,7 @@ public boolean isEmbedded() {
456474
* @throws Exception an exception.
457475
*/
458476
public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer) throws Exception {
459-
consumeFromEmbeddedTopics(consumer, this.topics);
477+
consumeFromEmbeddedTopics(consumer, this.topics.toArray(new String[0]));
460478
}
461479

462480
/**
@@ -476,9 +494,11 @@ public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) th
476494
* @throws Exception an exception.
477495
*/
478496
public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topics) throws Exception {
479-
for (String topic : topics) {
480-
assertThat(this.topics).as("topic '" + topic + "' is not in embedded topic list").contains(topic);
481-
}
497+
HashSet<String> diff = new HashSet<>(Arrays.asList(topics));
498+
diff.removeAll(new HashSet<>(this.topics));
499+
assertThat(this.topics)
500+
.as("topic(s):'" + diff + "' are not in embedded topic list")
501+
.containsAll(new HashSet<>(Arrays.asList(topics)));
482502
final CountDownLatch consumerLatch = new CountDownLatch(1);
483503
consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener() {
484504

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
* @author Dariusz Szablinski
111111
* @author Venil Noronha
112112
* @author Dimitri Penner
113+
* @author Nakul Mishra
113114
*/
114115
@ContextConfiguration
115116
@RunWith(SpringJUnit4ClassRunner.class)
@@ -642,6 +643,13 @@ public void testConverterBean() throws Exception {
642643
assertThat(this.config.listen16Message).isEqualTo("foobar");
643644
}
644645

646+
@Test
647+
public void testAddingTopics() {
648+
int count = embeddedKafka.getTopics().size();
649+
embeddedKafka.addTopics("testAddingTopics");
650+
assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 1);
651+
}
652+
645653
@Configuration
646654
@EnableKafka
647655
@EnableTransactionManagement(proxyTargetClass = true)

0 commit comments

Comments
 (0)