Skip to content

Commit 6686e5a

Browse files
artembilangaryrussell
authored andcommitted
GH-598: ConsumerFactory default methods
Avoid implementations having to implement all methods. Resolve a couple TODOs Add default implementation for `ConsumerFactory.createConsumer()` methods Fixes #598
1 parent be073a8 commit 6686e5a

File tree

3 files changed

+21
-35
lines changed

3 files changed

+21
-35
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,14 +28,17 @@
2828
* @param <V> the value type.
2929
*
3030
* @author Gary Russell
31+
* @author Artem Bilan
3132
*/
3233
public interface ConsumerFactory<K, V> {
3334

3435
/**
3536
* Create a consumer with the group id and client id as configured in the properties.
3637
* @return the consumer.
3738
*/
38-
Consumer<K, V> createConsumer();
39+
default Consumer<K, V> createConsumer() {
40+
return createConsumer(null);
41+
}
3942

4043
/**
4144
* Create a consumer, appending the suffix to the {@code client.id} property,
@@ -44,7 +47,9 @@ public interface ConsumerFactory<K, V> {
4447
* @return the consumer.
4548
* @since 1.3
4649
*/
47-
Consumer<K, V> createConsumer(String clientIdSuffix);
50+
default Consumer<K, V> createConsumer(String clientIdSuffix) {
51+
return createConsumer(null, clientIdSuffix);
52+
}
4853

4954
/**
5055
* Create a consumer with an explicit group id; in addition, the
@@ -55,24 +60,23 @@ public interface ConsumerFactory<K, V> {
5560
* @return the consumer.
5661
* @since 1.3
5762
*/
58-
Consumer<K, V> createConsumer(String groupId, String clientIdSuffix);
63+
default Consumer<K, V> createConsumer(String groupId, String clientIdSuffix) {
64+
return createConsumer(groupId, null, clientIdSuffix);
65+
}
5966

6067
/**
6168
* Create a consumer with an explicit group id; in addition, the
6269
* client id suffix is appended to the clientIdPrefix which overrides the
6370
* {@code client.id} property, if present.
6471
* If a factory does not implement this method, {@link #createConsumer(String, String)}
6572
* is invoked, ignoring the prefix.
66-
* TODO: remove default in 2.2
6773
* @param groupId the group id.
6874
* @param clientIdPrefix the prefix.
6975
* @param clientIdSuffix the suffix.
7076
* @return the consumer.
7177
* @since 2.1.1
7278
*/
73-
default Consumer<K, V> createConsumer(String groupId, String clientIdPrefix, String clientIdSuffix) {
74-
return createConsumer(groupId, clientIdSuffix);
75-
}
79+
Consumer<K, V> createConsumer(String groupId, String clientIdPrefix, String clientIdSuffix);
7680

7781
/**
7882
* Return true if consumers created by this factory use auto commit.

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@
3838
*
3939
* @author Gary Russell
4040
* @author Murali Reddy
41+
* @author Artem Bilan
4142
*/
4243
public class DefaultKafkaConsumerFactory<K, V> implements ConsumerFactory<K, V> {
4344

@@ -82,44 +83,22 @@ public Deserializer<V> getValueDeserializer() {
8283
return this.valueDeserializer;
8384
}
8485

85-
@Override
86-
public Consumer<K, V> createConsumer() {
87-
return createKafkaConsumer();
88-
}
89-
90-
@Override
91-
public Consumer<K, V> createConsumer(String clientIdSuffix) {
92-
return createKafkaConsumer(null, clientIdSuffix);
93-
}
94-
95-
@Override
96-
public Consumer<K, V> createConsumer(String groupId, String clientIdSuffix) {
97-
return createKafkaConsumer(groupId, clientIdSuffix);
98-
}
99-
10086
@Override
10187
public Consumer<K, V> createConsumer(String groupId, String clientIdPrefix, String clientIdSuffix) {
10288
return createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffix);
10389
}
10490

105-
protected KafkaConsumer<K, V> createKafkaConsumer() {
106-
return createKafkaConsumer(this.configs);
107-
}
108-
109-
protected KafkaConsumer<K, V> createKafkaConsumer(String groupId, String clientIdSuffix) {
110-
return createKafkaConsumer(groupId, null, clientIdSuffix);
111-
}
112-
11391
protected KafkaConsumer<K, V> createKafkaConsumer(String groupId, String clientIdPrefix,
11492
String clientIdSuffix) {
93+
11594
boolean overrideClientIdPrefix = StringUtils.hasText(clientIdPrefix);
11695
if (clientIdSuffix == null) {
11796
clientIdSuffix = "";
11897
}
11998
boolean shouldModifyClientId = (this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
12099
&& StringUtils.hasText(clientIdSuffix)) || overrideClientIdPrefix;
121100
if (groupId == null && !shouldModifyClientId) {
122-
return createKafkaConsumer();
101+
return createKafkaConsumer(this.configs);
123102
}
124103
else {
125104
Map<String, Object> modifiedConfigs = new HashMap<>(this.configs);
@@ -136,7 +115,7 @@ protected KafkaConsumer<K, V> createKafkaConsumer(String groupId, String clientI
136115
}
137116

138117
protected KafkaConsumer<K, V> createKafkaConsumer(Map<String, Object> configs) {
139-
return new KafkaConsumer<K, V>(configs, this.keyDeserializer, this.valueDeserializer);
118+
return new KafkaConsumer<>(configs, this.keyDeserializer, this.valueDeserializer);
140119
}
141120

142121
@Override

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.kafka.support.KafkaHeaderMapper;
3535
import org.springframework.kafka.support.KafkaHeaders;
3636
import org.springframework.kafka.support.KafkaNull;
37+
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
3738
import org.springframework.messaging.Message;
3839
import org.springframework.messaging.MessageHeaders;
3940
import org.springframework.messaging.support.MessageBuilder;
@@ -65,7 +66,9 @@ public MessagingMessageConverter() {
6566
if (JacksonPresent.isJackson2Present()) {
6667
this.headerMapper = new DefaultKafkaHeaderMapper();
6768
}
68-
// TODO: In 2.2, default to SimpleKafkaHeaderMapper if no Jackson
69+
else {
70+
this.headerMapper = new SimpleKafkaHeaderMapper();
71+
}
6972
}
7073

7174
/**

0 commit comments

Comments
 (0)