Skip to content

Commit b669c12

Browse files
frosieregaryrussell
authored andcommitted
GH-2070: Producer Factory Improvements
Resolves #2070 Update the DefaultKafkaProducerFactory to support producer configs overrides Restored the original copyright Added missing Java doc
1 parent 917134d commit b669c12

File tree

2 files changed

+56
-22
lines changed

2 files changed

+56
-22
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -691,17 +691,7 @@ private Producer<K, V> getOrCreateThreadBoundProducer() {
691691
* @return the producer.
692692
*/
693693
protected Producer<K, V> createKafkaProducer() {
694-
Map<String, Object> newConfigs;
695-
if (this.clientIdPrefix == null) {
696-
newConfigs = new HashMap<>(this.configs);
697-
}
698-
else {
699-
newConfigs = new HashMap<>(this.configs);
700-
newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
701-
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
702-
}
703-
checkBootstrap(newConfigs);
704-
return createRawProducer(newConfigs);
694+
return createRawProducer(getProducerConfigs());
705695
}
706696

707697
protected Producer<K, V> createTransactionalProducerForPartition() {
@@ -826,17 +816,7 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout
826816

827817
private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
828818
BiPredicate<CloseSafeProducer<K, V>, Duration> remover) {
829-
830-
Producer<K, V> newProducer;
831-
Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
832-
String txId = prefix + suffix;
833-
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txId);
834-
if (this.clientIdPrefix != null) {
835-
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
836-
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
837-
}
838-
checkBootstrap(newProducerConfigs);
839-
newProducer = createRawProducer(newProducerConfigs);
819+
Producer<K, V> newProducer = createRawProducer(getTxProducerConfigs(prefix + suffix));
840820
try {
841821
newProducer.initTransactions();
842822
}
@@ -908,6 +888,35 @@ public void closeThreadBoundProducer() {
908888
}
909889
}
910890

891+
/**
892+
* Return the configuration of a producer.
893+
* @return the configuration of a producer.
894+
* @since 2.8.3
895+
* @see #createKafkaProducer()
896+
*/
897+
protected Map<String, Object> getProducerConfigs() {
898+
final Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
899+
checkBootstrap(newProducerConfigs);
900+
if (this.clientIdPrefix != null) {
901+
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
902+
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
903+
}
904+
return newProducerConfigs;
905+
}
906+
907+
/**
908+
* Return the configuration of a transactional producer.
909+
* @param transactionId the transactionId.
910+
* @return the configuration of a transactional producer.
911+
* @since 2.8.3
912+
* @see #doCreateTxProducer(String, String, BiPredicate)
913+
*/
914+
protected Map<String, Object> getTxProducerConfigs(String transactionId) {
915+
final Map<String, Object> newProducerConfigs = getProducerConfigs();
916+
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
917+
return newProducerConfigs;
918+
}
919+
911920
/**
912921
* A wrapper class for the delegate.
913922
*

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.concurrent.atomic.AtomicInteger;
4242

4343
import org.apache.kafka.clients.producer.Callback;
44+
import org.apache.kafka.clients.producer.MockProducer;
4445
import org.apache.kafka.clients.producer.Producer;
4546
import org.apache.kafka.clients.producer.ProducerConfig;
4647
import org.apache.kafka.common.KafkaException;
@@ -499,4 +500,28 @@ void configSerializer() {
499500
verify(value).configure(any(), eq(false));
500501
}
501502

503+
@Test
504+
void testConfigOverridesOfTransactionalProducer() {
505+
final Map<String, Object> producerFactoryConfigs = Map.of("linger.ms", 100);
506+
final Map<String, Object> producerConfigs = new HashMap<>();
507+
final DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(producerFactoryConfigs) {
508+
@Override
509+
protected Map<String, Object> getTxProducerConfigs(String transactionId) {
510+
final Map<String, Object> newProducerConfigs = super.getTxProducerConfigs(transactionId);
511+
newProducerConfigs.put("linger.ms", 200);
512+
return newProducerConfigs;
513+
}
514+
515+
@Override
516+
protected Producer<String, String> createRawProducer(Map<String, Object> rawConfigs) {
517+
producerConfigs.putAll(rawConfigs);
518+
return new MockProducer<>();
519+
}
520+
};
521+
pf.setTransactionIdPrefix("tx-prefix");
522+
pf.createProducer();
523+
assertThat(producerFactoryConfigs).containsEntry("linger.ms", 100);
524+
assertThat(producerConfigs).containsEntry("linger.ms", 200);
525+
}
526+
502527
}

0 commit comments

Comments
 (0)