Skip to content

Commit 9f724ea

Browse files
garyrussellartembilan
authored andcommitted
GH-640: Upgrade kafka version to 1.1.0
Resolves #640 Polishing - Logging - remove unnecessary excludes - add slf4j over log4j2 - add logging to -test module tests - suppress some noise from embedded kafka Resolves #278 * Ignore `StreamsBuilderFactoryBeanTests` on Windows * Close remaining dangling `KafkaConsumer`s in tests
1 parent b048aaa commit 9f724ea

File tree

13 files changed

+357
-168
lines changed

13 files changed

+357
-168
lines changed

build.gradle

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,10 @@ subprojects { subproject ->
7878
junitJupiterVersion = '5.1.0'
7979
junitPlatformVersion = '1.1.0'
8080
junitVintageVersion = '5.1.0'
81-
kafkaVersion = '1.0.1'
81+
kafkaVersion = '1.1.0'
82+
log4jVersion = '2.11.0'
8283
mockitoVersion = '2.15.0'
8384
scalaVersion = '2.11'
84-
slf4jVersion = '1.7.25'
8585
springRetryVersion = '1.2.2.RELEASE'
8686
springVersion = '5.0.4.RELEASE'
8787
springDataCommonsVersion = '2.0.4.RELEASE'
@@ -107,6 +107,9 @@ subprojects { subproject ->
107107

108108
// To avoid compiler warnings about @API annotations in JUnit code
109109
testCompileOnly 'org.apiguardian:apiguardian-api:1.0.0'
110+
111+
testRuntime "org.apache.logging.log4j:log4j-core:$log4jVersion"
112+
testRuntime "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
110113
}
111114

112115
// enable all compiler warnings; individual projects may customize further
@@ -177,14 +180,8 @@ project ('spring-kafka') {
177180
compile "org.springframework:spring-messaging:$springVersion"
178181
compile "org.springframework:spring-tx:$springVersion"
179182
compile "org.springframework.retry:spring-retry:$springRetryVersion"
180-
compile ("org.apache.kafka:kafka-clients:$kafkaVersion") {
181-
exclude group: 'org.slf4j', module: 'slf4j-api'
182-
}
183-
compile ("org.apache.kafka:kafka-streams:$kafkaVersion") {
184-
optional it
185-
exclude group: 'org.slf4j', module: 'slf4j-api'
186-
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
187-
}
183+
compile "org.apache.kafka:kafka-clients:$kafkaVersion"
184+
compile ("org.apache.kafka:kafka-streams:$kafkaVersion", optional)
188185

189186
compile ("com.fasterxml.jackson.core:jackson-core:$jacksonVersion", optional)
190187
compile ("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion", optional)
@@ -196,8 +193,6 @@ project ('spring-kafka') {
196193
testCompile project (":spring-kafka-test")
197194
testCompile "org.assertj:assertj-core:$assertjVersion"
198195
testCompile "org.springframework:spring-tx:$springVersion"
199-
200-
testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion"
201196
}
202197
}
203198

@@ -211,13 +206,8 @@ project ('spring-kafka-test') {
211206

212207
compile ("org.apache.kafka:kafka-clients:$kafkaVersion:test")
213208

214-
compile ("org.apache.kafka:kafka_$scalaVersion:$kafkaVersion") {
215-
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
216-
}
217-
218-
compile ("org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test") {
219-
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
220-
}
209+
compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion"
210+
compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
221211

222212
compile ("junit:junit:$junit4Version") {
223213
exclude group: 'org.hamcrest', module: 'hamcrest-core'
@@ -228,6 +218,7 @@ project ('spring-kafka-test') {
228218

229219
compile ("org.hamcrest:hamcrest-all:$hamcrestVersion", optional)
230220
compile ("org.assertj:assertj-core:$assertjVersion", optional)
221+
compile ("org.apache.logging.log4j:log4j-core:$log4jVersion", optional)
231222
}
232223
}
233224

spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.kafka.clients.consumer.ConsumerRecords;
4747
import org.apache.kafka.common.TopicPartition;
4848
import org.apache.kafka.common.security.auth.SecurityProtocol;
49-
import org.apache.kafka.common.utils.AppInfoParser;
5049
import org.apache.kafka.common.utils.Time;
5150
import org.junit.rules.ExternalResource;
5251

@@ -89,27 +88,27 @@ public class KafkaEmbedded extends ExternalResource implements KafkaRule, Initia
8988

9089
public static final long METADATA_PROPAGATION_TIMEOUT = 10000L;
9190

92-
private static final String clientVersion;
91+
// private static final String clientVersion;
9392

9493
private static final Method testUtilsCreateBrokerConfigMethod;
9594

9695
static {
97-
clientVersion = AppInfoParser.getVersion();
98-
if (clientVersion.startsWith("1.1.")) {
99-
try {
100-
testUtilsCreateBrokerConfigMethod = TestUtils.class.getDeclaredMethod("createBrokerConfig",
101-
int.class, String.class, boolean.class, boolean.class, int.class,
102-
scala.Option.class, scala.Option.class, scala.Option.class,
103-
boolean.class, boolean.class, int.class, boolean.class, int.class, boolean.class,
104-
int.class, scala.Option.class, int.class, boolean.class);
105-
}
106-
catch (NoSuchMethodException | SecurityException e) {
107-
throw new RuntimeException("Failed to determine TestUtils.createBrokerConfig() method");
108-
}
109-
}
110-
else {
96+
// clientVersion = AppInfoParser.getVersion();
97+
// if (clientVersion.startsWith("1.1.")) {
98+
// try {
99+
// testUtilsCreateBrokerConfigMethod = TestUtils.class.getDeclaredMethod("createBrokerConfig",
100+
// int.class, String.class, boolean.class, boolean.class, int.class,
101+
// scala.Option.class, scala.Option.class, scala.Option.class,
102+
// boolean.class, boolean.class, int.class, boolean.class, int.class, boolean.class,
103+
// int.class, scala.Option.class, int.class, boolean.class);
104+
// }
105+
// catch (NoSuchMethodException | SecurityException e) {
106+
// throw new RuntimeException("Failed to determine TestUtils.createBrokerConfig() method");
107+
// }
108+
// }
109+
// else {
111110
testUtilsCreateBrokerConfigMethod = null;
112-
}
111+
// }
113112
}
114113

115114
private final int count;
@@ -222,6 +221,8 @@ public void before() throws Exception { //NOSONAR
222221
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
223222
brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
224223
brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
224+
brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(),
225+
String.valueOf(Long.MAX_VALUE));
225226
if (this.brokerProperties != null) {
226227
this.brokerProperties.forEach(brokerConfigProperties::put);
227228
}
@@ -251,7 +252,7 @@ public Properties createBrokerProperties(int i) {
251252
scala.Option.apply(null),
252253
scala.Option.apply(null),
253254
scala.Option.apply(null),
254-
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1);
255+
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1, false);
255256
}
256257
else {
257258
try {
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.test.rule;
18+
19+
import java.util.Arrays;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
import java.util.stream.Stream;
23+
24+
import org.apache.commons.logging.Log;
25+
import org.apache.commons.logging.LogFactory;
26+
import org.apache.logging.log4j.Level;
27+
import org.apache.logging.log4j.LogManager;
28+
import org.apache.logging.log4j.core.LoggerContext;
29+
import org.apache.logging.log4j.core.config.Configuration;
30+
import org.apache.logging.log4j.core.config.LoggerConfig;
31+
import org.junit.rules.MethodRule;
32+
import org.junit.runners.model.FrameworkMethod;
33+
import org.junit.runners.model.Statement;
34+
35+
import org.springframework.util.Assert;
36+
import org.springframework.util.ObjectUtils;
37+
38+
/**
39+
* A JUnit method {@link org.junit.Rule} that changes the Log4J 2 logger level for a set of classes
40+
* or packages while a test method is running. Useful for performance or scalability tests
41+
* where we don't want to generate a large log in a tight inner loop, or
42+
* enabling debug logging for a test case.
43+
*
44+
* @author Artem Bilan
45+
*
46+
* @since 2.2
47+
*
48+
*/
49+
public final class Log4j2LevelAdjuster implements MethodRule {
50+
51+
private static final Log logger = LogFactory.getLog(Log4j2LevelAdjuster.class);
52+
53+
private final Class<?>[] classes;
54+
55+
private final Level level;
56+
57+
private final String[] categories;
58+
59+
private Log4j2LevelAdjuster(Level level) {
60+
this(level, null, new String[] { "org.springframework.integration" });
61+
}
62+
63+
private Log4j2LevelAdjuster(Level level, Class<?>[] classes, String[] categories) {
64+
Assert.notNull(level, "'level' must be null");
65+
this.level = level;
66+
this.classes = classes != null ? classes : new Class<?>[0];
67+
68+
Stream<String> categoryStream = Stream.of(getClass().getPackage().getName());
69+
70+
if (!ObjectUtils.isEmpty(categories)) {
71+
categoryStream = Stream.concat(Arrays.stream(categories), categoryStream);
72+
}
73+
74+
this.categories = categoryStream.toArray(String[]::new);
75+
}
76+
77+
@Override
78+
public Statement apply(final Statement base, final FrameworkMethod method, Object target) {
79+
return new Statement() {
80+
81+
@Override
82+
public void evaluate() throws Throwable {
83+
LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
84+
Configuration config = ctx.getConfiguration();
85+
86+
Map<Class<?>, Level> classLevels = new HashMap<>();
87+
for (Class<?> cls : Log4j2LevelAdjuster.this.classes) {
88+
String className = cls.getName();
89+
LoggerConfig loggerConfig = config.getLoggerConfig(className);
90+
LoggerConfig specificConfig = loggerConfig;
91+
92+
// We need a specific configuration for this logger,
93+
// otherwise we would change the level of all other loggers
94+
// having the original configuration as parent as well
95+
96+
if (!loggerConfig.getName().equals(className)) {
97+
specificConfig = new LoggerConfig(className, Log4j2LevelAdjuster.this.level, true);
98+
specificConfig.setParent(loggerConfig);
99+
config.addLogger(className, specificConfig);
100+
}
101+
102+
classLevels.put(cls, specificConfig.getLevel());
103+
specificConfig.setLevel(Log4j2LevelAdjuster.this.level);
104+
}
105+
106+
Map<String, Level> categoryLevels = new HashMap<>();
107+
for (String category : Log4j2LevelAdjuster.this.categories) {
108+
LoggerConfig loggerConfig = config.getLoggerConfig(category);
109+
LoggerConfig specificConfig = loggerConfig;
110+
111+
// We need a specific configuration for this logger,
112+
// otherwise we would change the level of all other loggers
113+
// having the original configuration as parent as well
114+
115+
if (!loggerConfig.getName().equals(category)) {
116+
specificConfig = new LoggerConfig(category, Log4j2LevelAdjuster.this.level, true);
117+
specificConfig.setParent(loggerConfig);
118+
config.addLogger(category, specificConfig);
119+
}
120+
121+
categoryLevels.put(category, specificConfig.getLevel());
122+
specificConfig.setLevel(Log4j2LevelAdjuster.this.level);
123+
}
124+
125+
ctx.updateLoggers();
126+
127+
logger.debug("++++++++++++++++++++++++++++ "
128+
+ "Overridden log level setting for: " + Arrays.toString(Log4j2LevelAdjuster.this.classes)
129+
+ " and " + Arrays.toString(Log4j2LevelAdjuster.this.categories)
130+
+ " for test " + method.getName());
131+
132+
try {
133+
base.evaluate();
134+
}
135+
finally {
136+
logger.debug("++++++++++++++++++++++++++++ "
137+
+ "Restoring log level setting for: " + Arrays.toString(Log4j2LevelAdjuster.this.classes)
138+
+ " and " + Arrays.toString(Log4j2LevelAdjuster.this.categories)
139+
+ " for test " + method.getName());
140+
141+
for (Class<?> cls : Log4j2LevelAdjuster.this.classes) {
142+
LoggerConfig loggerConfig = config.getLoggerConfig(cls.getName());
143+
loggerConfig.setLevel(classLevels.get(cls));
144+
}
145+
146+
for (String category : Log4j2LevelAdjuster.this.categories) {
147+
LoggerConfig loggerConfig = config.getLoggerConfig(category);
148+
loggerConfig.setLevel(categoryLevels.get(category));
149+
}
150+
151+
ctx.updateLoggers();
152+
}
153+
}
154+
155+
};
156+
}
157+
158+
/**
159+
* Specify the classes for logging level adjusting configured before.
160+
* A new copy Log4j2LevelAdjuster instance is produced by this method.
161+
* The provided classes parameter overrides existing value in the {@link #classes}.
162+
* @param classes the classes to use for logging level adjusting
163+
* @return a Log4j2LevelAdjuster copy with the provided classes
164+
*/
165+
public Log4j2LevelAdjuster classes(Class<?>... classes) {
166+
return classes(false, classes);
167+
}
168+
169+
/**
170+
* Specify the classes for logging level adjusting configured before.
171+
* A new copy Log4j2LevelAdjuster instance is produced by this method.
172+
* The provided classes parameter can be merged with existing value in the {@link #classes}.
173+
* @param merge to merge or not with previously configured {@link #classes}
174+
* @param classes the classes to use for logging level adjusting
175+
* @return a Log4j2LevelAdjuster copy with the provided classes
176+
* @since 5.0.2
177+
*/
178+
public Log4j2LevelAdjuster classes(boolean merge, Class<?>... classes) {
179+
return new Log4j2LevelAdjuster(this.level,
180+
merge ? Stream.of(this.classes, classes).flatMap(Stream::of).toArray(Class<?>[]::new) : classes,
181+
this.categories);
182+
}
183+
184+
/**
185+
* Specify the categories for logging level adjusting configured before.
186+
* A new copy Log4j2LevelAdjuster instance is produced by this method.
187+
* The provided categories parameter overrides existing value in the {@link #categories}.
188+
* @param categories the categories to use for logging level adjusting
189+
* @return a Log4j2LevelAdjuster copy with the provided categories
190+
*/
191+
public Log4j2LevelAdjuster categories(String... categories) {
192+
return categories(false, categories);
193+
}
194+
195+
/**
196+
* Specify the categories for logging level adjusting configured before.
197+
* A new copy Log4j2LevelAdjuster instance is produced by this method.
198+
* The provided categories parameter can be merged with existing value in the {@link #categories}.
199+
* @param merge to merge or not with previously configured {@link #categories}
200+
* @param categories the categories to use for logging level adjusting
201+
* @return a Log4j2LevelAdjuster copy with the provided categories
202+
* @since 5.0.2
203+
*/
204+
public Log4j2LevelAdjuster categories(boolean merge, String... categories) {
205+
return new Log4j2LevelAdjuster(this.level, this.classes,
206+
merge ? Stream.of(this.categories, categories).flatMap(Stream::of).toArray(String[]::new) : categories);
207+
}
208+
209+
/**
210+
* The factory to produce Log4j2LevelAdjuster instances for {@link Level#TRACE} logging
211+
* with the {@code org.springframework.integration} as default category.
212+
* @return the Log4j2LevelAdjuster instance
213+
*/
214+
public static Log4j2LevelAdjuster trace() {
215+
return forLevel(Level.TRACE);
216+
}
217+
218+
/**
219+
* The factory to produce Log4j2LevelAdjuster instances for {@link Level#DEBUG} logging
220+
* with the {@code org.springframework.integration} as default category.
221+
* @return the Log4j2LevelAdjuster instance
222+
*/
223+
public static Log4j2LevelAdjuster debug() {
224+
return forLevel(Level.DEBUG);
225+
}
226+
227+
/**
228+
* The factory to produce Log4j2LevelAdjuster instances for {@link Level#INFO} logging
229+
* with the {@code org.springframework.integration} as default category.
230+
* @return the Log4j2LevelAdjuster instance
231+
*/
232+
public static Log4j2LevelAdjuster info() {
233+
return forLevel(Level.INFO);
234+
}
235+
236+
/**
237+
* The factory to produce Log4j2LevelAdjuster instances for arbitrary logging {@link Level}
238+
* with the {@code org.springframework.integration} as default category.
239+
* @param level the {@link Level} to use for logging
240+
* @return the Log4j2LevelAdjuster instance
241+
*/
242+
public static Log4j2LevelAdjuster forLevel(Level level) {
243+
return new Log4j2LevelAdjuster(level);
244+
}
245+
246+
}

0 commit comments

Comments
 (0)