Skip to content

Commit 4505874

Browse files
sobychackoartembilan
authored andcommitted
GH-548: StreamsBuilderFactoryBean enhancements
Fixes #548 * Make `StreamsConfig` customizable in the `StreamsBuilderFactoryBean` * Set the phase on `StreamsBuilderFactoryBean` to the `Integer.MAX_VALUE - 1000` * Adding tests * Addressing PR review comments * Addressing PR review comments
1 parent 3771e2a commit 4505874

File tree

2 files changed

+124
-4
lines changed

2 files changed

+124
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/StreamsBuilderFactoryBean.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,24 +36,25 @@
3636
*
3737
* @author Artem Bilan
3838
* @author Ivan Ursul
39+
* @author Soby Chacko
3940
*
4041
* @since 1.1.4
4142
*/
4243
public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilder> implements SmartLifecycle {
4344

4445
private static final int DEFAULT_CLOSE_TIMEOUT = 10;
4546

46-
private final StreamsConfig streamsConfig;
47-
4847
private final CleanupConfig cleanupConfig;
4948

49+
private StreamsConfig streamsConfig;
50+
5051
private KafkaStreams kafkaStreams;
5152

5253
private KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
5354

5455
private boolean autoStartup = true;
5556

56-
private int phase = Integer.MIN_VALUE;
57+
private int phase = Integer.MAX_VALUE - 1000;
5758

5859
private KafkaStreams.StateListener stateListener;
5960

@@ -63,6 +64,16 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
6364

6465
private volatile boolean running;
6566

67+
/**
68+
* Default constructor that creates the factory without a {@link StreamsConfig}.
69+
* It is the factory user's responsibility to properly set {@link StreamsConfig}
70+
* using {@link StreamsBuilderFactoryBean#setStreamsConfig(StreamsConfig)}
71+
* @since 2.1.3.
72+
*/
73+
public StreamsBuilderFactoryBean() {
74+
this.cleanupConfig = new CleanupConfig();
75+
}
76+
6677
/**
6778
* Construct an instance with the supplied streams configuration.
6879
* @param streamsConfig the streams configuration.
@@ -107,6 +118,16 @@ public StreamsBuilderFactoryBean(Map<String, Object> streamsConfig, CleanupConfi
107118
this.cleanupConfig = cleanupConfig;
108119
}
109120

121+
/**
122+
* Set {@link StreamsConfig} on this factory.
123+
* @param streamsConfig the streams configuration.
124+
* @since 2.1.3
125+
*/
126+
public void setStreamsConfig(StreamsConfig streamsConfig) {
127+
Assert.notNull(streamsConfig, "'streamsConfig' must not be null");
128+
this.streamsConfig = streamsConfig;
129+
}
130+
110131
public void setClientSupplier(KafkaClientSupplier clientSupplier) {
111132
Assert.notNull(clientSupplier, "'clientSupplier' must not be null");
112133
this.clientSupplier = clientSupplier; // NOSONAR (sync)
@@ -137,6 +158,9 @@ public Class<?> getObjectType() {
137158

138159
@Override
139160
protected StreamsBuilder createInstance() throws Exception {
161+
if (this.autoStartup) {
162+
Assert.notNull(this.streamsConfig, "'streamsConfig' must not be null");
163+
}
140164
return new StreamsBuilder();
141165
}
142166

@@ -166,6 +190,7 @@ public void stop(Runnable callback) {
166190
public synchronized void start() {
167191
if (!this.running) {
168192
try {
193+
Assert.notNull(this.streamsConfig, "'streamsConfig' must not be null");
169194
this.kafkaStreams = new KafkaStreams(getObject().build(), this.streamsConfig, this.clientSupplier);
170195
this.kafkaStreams.setStateListener(this.stateListener);
171196
this.kafkaStreams.setUncaughtExceptionHandler(this.exceptionHandler);
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
import org.apache.kafka.streams.StreamsConfig;
25+
import org.junit.Test;
26+
import org.junit.runner.RunWith;
27+
28+
import org.springframework.beans.factory.annotation.Autowired;
29+
import org.springframework.beans.factory.annotation.Value;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.kafka.KafkaException;
33+
import org.springframework.kafka.annotation.EnableKafka;
34+
import org.springframework.kafka.annotation.EnableKafkaStreams;
35+
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
36+
import org.springframework.kafka.test.context.EmbeddedKafka;
37+
import org.springframework.kafka.test.rule.KafkaEmbedded;
38+
import org.springframework.test.annotation.DirtiesContext;
39+
import org.springframework.test.context.junit4.SpringRunner;
40+
41+
/**
42+
* @author Soby Chacko
43+
*/
44+
@RunWith(SpringRunner.class)
45+
@DirtiesContext
46+
@EmbeddedKafka
47+
public class StreamsBuilderFactoryLateConfigTests {
48+
49+
private static final String APPLICATION_ID = "streamsBuilderFactoryLateConfigTests";
50+
51+
@Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
52+
private String brokerAddresses;
53+
54+
@Autowired
55+
private StreamsBuilderFactoryBean streamsBuilderFactoryBean;
56+
57+
@Test(expected = KafkaException.class)
58+
public void testStreamBuilderFactoryCannotBeStartedWithoutStreamconfig() {
59+
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
60+
streamsBuilderFactoryBean.start();
61+
}
62+
63+
@Test(expected = IllegalArgumentException.class)
64+
public void testStreamBuilderFactoryCannotBeInstantiatedWhenAutoStart() throws Exception {
65+
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
66+
streamsBuilderFactoryBean.setAutoStartup(true);
67+
streamsBuilderFactoryBean.createInstance();
68+
}
69+
70+
@Test
71+
public void testStreamsBuilderFactoryWithConfigProvidedLater() {
72+
Map<String, Object> props = new HashMap<>();
73+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
74+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
75+
StreamsConfig streamsConfig = new StreamsConfig(props);
76+
streamsBuilderFactoryBean.setStreamsConfig(streamsConfig);
77+
78+
assertThat(streamsBuilderFactoryBean.isRunning()).isFalse();
79+
streamsBuilderFactoryBean.start();
80+
assertThat(streamsBuilderFactoryBean.isRunning()).isTrue();
81+
}
82+
83+
@Configuration
84+
@EnableKafka
85+
@EnableKafkaStreams
86+
public static class KafkaStreamsConfiguration {
87+
88+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
89+
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() {
90+
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
91+
streamsBuilderFactoryBean.setAutoStartup(false);
92+
return streamsBuilderFactoryBean;
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)