diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 7a17489596..afc5d7587a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -55,6 +55,7 @@ import org.springframework.kafka.support.TransactionSupport; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; /** * The {@link ProducerFactory} implementation for a {@code singleton} shared @@ -92,7 +93,7 @@ public class DefaultKafkaProducerFactory implements ProducerFactory, */ public static final Duration DEFAULT_PHYSICAL_CLOSE_TIMEOUT = Duration.ofSeconds(30); - private static final Log logger = LogFactory.getLog(DefaultKafkaProducerFactory.class); // NOSONAR + private static final Log LOGGER = LogFactory.getLog(DefaultKafkaProducerFactory.class); private final Map configs; @@ -102,8 +103,6 @@ public class DefaultKafkaProducerFactory implements ProducerFactory, private final Map> consumerProducers = new HashMap<>(); - private volatile CloseSafeProducer producer; - private Serializer keySerializer; private Serializer valueSerializer; @@ -116,6 +115,8 @@ public class DefaultKafkaProducerFactory implements ProducerFactory, private boolean producerPerConsumerPartition = true; + private volatile CloseSafeProducer producer; + /** * Construct a factory with the provided configuration. * @param configs the configuration. @@ -126,6 +127,9 @@ public DefaultKafkaProducerFactory(Map configs) { /** * Construct a factory with the provided configuration and {@link Serializer}s. + * Also configures a {@link #transactionIdPrefix} as a value from the + * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. + * This config is going to be overridden with a suffix for target {@link Producer} instance. * @param configs the configuration. * @param keySerializer the key {@link Serializer}. * @param valueSerializer the value {@link Serializer}. @@ -133,9 +137,20 @@ public DefaultKafkaProducerFactory(Map configs) { public DefaultKafkaProducerFactory(Map configs, @Nullable Serializer keySerializer, @Nullable Serializer valueSerializer) { + this.configs = new HashMap<>(configs); this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; + + String txId = (String) this.configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG); + if (StringUtils.hasText(txId)) { + setTransactionIdPrefix(txId); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("If 'setTransactionIdPrefix()' is not going to be configured, " + + "an existing 'transactional.id' config with value: '" + txId + + "' will be suffixed with the number for concurrent transactions support."); + } + } } @Override @@ -152,7 +167,7 @@ public void setValueSerializer(@Nullable Serializer valueSerializer) { } /** - * The time to wait when physically closing the producer (when {@link #stop()} or {@link #destroy()} is invoked). + * The time to wait when physically closing the producer (when {@link #reset()} or {@link #destroy()} is invoked). * Specified in seconds; default {@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}. * @param physicalCloseTimeout the timeout in seconds. * @since 1.0.7 @@ -162,23 +177,29 @@ public void setPhysicalCloseTimeout(int physicalCloseTimeout) { } /** - * Set the transactional.id prefix. + * Set a prefix for the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} config. + * By default a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used as a prefix + * in the target producer configs. * @param transactionIdPrefix the prefix. * @since 1.3 */ - public void setTransactionIdPrefix(String transactionIdPrefix) { + public final void setTransactionIdPrefix(String transactionIdPrefix) { Assert.notNull(transactionIdPrefix, "'transactionIdPrefix' cannot be null"); this.transactionIdPrefix = transactionIdPrefix; enableIdempotentBehaviour(); } + protected String getTransactionIdPrefix() { + return this.transactionIdPrefix; + } + /** * When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. */ private void enableIdempotentBehaviour() { Object previousValue = this.configs.putIfAbsent(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - if (logger.isDebugEnabled() && Boolean.FALSE.equals(previousValue)) { - logger.debug("The '" + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG + + if (LOGGER.isDebugEnabled() && Boolean.FALSE.equals(previousValue)) { + LOGGER.debug("The '" + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG + "' is set to false, may result in duplicate messages"); } } @@ -233,13 +254,13 @@ public void destroy() { producerToClose.delegate.close(this.physicalCloseTimeout); } catch (Exception e) { - logger.error("Exception while closing producer", e); + LOGGER.error("Exception while closing producer", e); } producerToClose = this.cache.poll(); } synchronized (this.consumerProducers) { this.consumerProducers.forEach( - (k, v) -> v.delegate.close(this.physicalCloseTimeout)); + (k, v) -> v.delegate.close(this.physicalCloseTimeout)); this.consumerProducers.clear(); } } @@ -251,47 +272,13 @@ public void onApplicationEvent(ContextStoppedEvent event) { } } - /** - * NoOp. - * @deprecated {@link org.springframework.context.Lifecycle} is no longer implemented. - */ - @Deprecated - public void start() { - // NOSONAR - } - - /** - * NoOp. - * @deprecated {@link org.springframework.context.Lifecycle} is no longer implemented; - * use {@link #reset()} to close the {@link Producer}(s). - */ - @Deprecated - public void stop() { - reset(); - } - /** * Close the {@link Producer}(s) and clear the cache of transactional * {@link Producer}(s). * @since 2.2 */ public void reset() { - try { - destroy(); - } - catch (Exception e) { - logger.error("Exception while closing producer", e); - } - } - - /** - * NoOp. - * @return always true. - * @deprecated {@link org.springframework.context.Lifecycle} is no longer implemented. - */ - @Deprecated - public boolean isRunning() { - return true; + destroy(); } @Override @@ -307,7 +294,7 @@ public Producer createProducer() { if (this.producer == null) { synchronized (this) { if (this.producer == null) { - this.producer = new CloseSafeProducer(createKafkaProducer()); + this.producer = new CloseSafeProducer<>(createKafkaProducer()); } } } @@ -320,7 +307,7 @@ public Producer createProducer() { * @return the producer. */ protected Producer createKafkaProducer() { - return new KafkaProducer(this.configs, this.keySerializer, this.valueSerializer); + return new KafkaProducer<>(this.configs, this.keySerializer, this.valueSerializer); } Producer createTransactionalProducerForPartition() { @@ -370,13 +357,15 @@ protected Producer createTransactionalProducer() { } } - private CloseSafeProducer doCreateTxProducer(String suffix, Consumer> remover) { + private CloseSafeProducer doCreateTxProducer(String suffix, + @Nullable Consumer> remover) { + Producer newProducer; Map newProducerConfigs = new HashMap<>(this.configs); newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix); - newProducer = new KafkaProducer(newProducerConfigs, this.keySerializer, this.valueSerializer); + newProducer = new KafkaProducer<>(newProducerConfigs, this.keySerializer, this.valueSerializer); newProducer.initTransactions(); - return new CloseSafeProducer(newProducer, this.cache, remover, + return new CloseSafeProducer<>(newProducer, this.cache, remover, (String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); } @@ -471,15 +460,15 @@ public void initTransactions() { @Override public void beginTransaction() throws ProducerFencedException { - if (logger.isDebugEnabled()) { - logger.debug("beginTransaction: " + this); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("beginTransaction: " + this); } try { this.delegate.beginTransaction(); } catch (RuntimeException e) { - if (logger.isErrorEnabled()) { - logger.error("beginTransaction failed: " + this, e); + if (LOGGER.isErrorEnabled()) { + LOGGER.error("beginTransaction failed: " + this, e); } this.txFailed = true; throw e; @@ -495,15 +484,15 @@ public void sendOffsetsToTransaction(Map offs @Override public void commitTransaction() throws ProducerFencedException { - if (logger.isDebugEnabled()) { - logger.debug("commitTransaction: " + this); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("commitTransaction: " + this); } try { this.delegate.commitTransaction(); } catch (RuntimeException e) { - if (logger.isErrorEnabled()) { - logger.error("commitTransaction failed: " + this, e); + if (LOGGER.isErrorEnabled()) { + LOGGER.error("commitTransaction failed: " + this, e); } this.txFailed = true; throw e; @@ -512,15 +501,15 @@ public void commitTransaction() throws ProducerFencedException { @Override public void abortTransaction() throws ProducerFencedException { - if (logger.isDebugEnabled()) { - logger.debug("abortTransaction: " + this); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("abortTransaction: " + this); } try { this.delegate.abortTransaction(); } catch (RuntimeException e) { - if (logger.isErrorEnabled()) { - logger.error("Abort failed: " + this, e); + if (LOGGER.isErrorEnabled()) { + LOGGER.error("Abort failed: " + this, e); } this.txFailed = true; throw e; @@ -543,9 +532,10 @@ public void close(long timeout, @Nullable TimeUnit unit) { public void close(@Nullable Duration timeout) { if (this.cache != null) { if (this.txFailed) { - if (logger.isWarnEnabled()) { - logger.warn("Error during transactional operation; producer removed from cache; possible cause: " - + "broker restarted during transaction: " + this); + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Error during transactional operation; producer removed from cache; possible " + + "cause: " + + "broker restarted during transaction: " + this); } if (timeout == null) { this.delegate.close(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index b7a0cd228f..2c8c9e484d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -17,7 +17,8 @@ package org.springframework.kafka.core; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; @@ -74,6 +75,7 @@ /** * @author Gary Russell * @author Nakul Mishra + * @author Artem Bilan * * @since 1.3 * @@ -90,12 +92,12 @@ public class KafkaTemplateTransactionTests { private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka(); @Test - public void testLocalTransaction() throws Exception { + public void testLocalTransaction() { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); senderProps.put(ProducerConfig.RETRIES_CONFIG, 1); + senderProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my.transaction."); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); pf.setKeySerializer(new StringSerializer()); - pf.setTransactionIdPrefix("my.transaction."); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); Map consumerProps = KafkaTestUtils.consumerProps("testLocalTx", "false", embeddedKafka); @@ -126,7 +128,7 @@ record = iterator.next(); } @Test - public void testGlobalTransaction() throws Exception { + public void testGlobalTransaction() { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); senderProps.put(ProducerConfig.RETRIES_CONFIG, 1); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); @@ -142,11 +144,12 @@ public void testGlobalTransaction() throws Exception { embeddedKafka.consumeFromAnEmbeddedTopic(consumer, STRING_KEY_TOPIC); KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION); - new TransactionTemplate(tm).execute(s -> { - template.sendDefault("foo", "bar"); - template.sendDefault("baz", "qux"); - return null; - }); + new TransactionTemplate(tm) + .execute(s -> { + template.sendDefault("foo", "bar"); + template.sendDefault("baz", "qux"); + return null; + }); ConsumerRecords records = KafkaTestUtils.getRecords(consumer); Iterator> iterator = records.iterator(); ConsumerRecord record = iterator.next(); @@ -185,26 +188,22 @@ public void testDeclarative() { } @Test - public void testDefaultProducerIdempotentConfig() throws Exception { + public void testDefaultProducerIdempotentConfig() { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); - DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>( - senderProps); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); pf.setTransactionIdPrefix("my.transaction."); pf.destroy(); - assertThat(pf.getConfigurationProperties() - .get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)).isEqualTo(true); + assertThat(pf.getConfigurationProperties().get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)).isEqualTo(true); } @Test - public void testOverrideProducerIdempotentConfig() throws Exception { + public void testOverrideProducerIdempotentConfig() { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); senderProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); - DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>( - senderProps); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); pf.setTransactionIdPrefix("my.transaction."); pf.destroy(); - assertThat(pf.getConfigurationProperties() - .get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)).isEqualTo(false); + assertThat(pf.getConfigurationProperties().get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)).isEqualTo(false); } @Test @@ -216,9 +215,9 @@ public void testNoTx() { pf.setTransactionIdPrefix("my.transaction."); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); - assertThatThrownBy(() -> template.send("foo", "bar")) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("No transaction is in process;"); + assertThatIllegalStateException() + .isThrownBy(() -> template.send("foo", "bar")) + .withMessageContaining("No transaction is in process;"); } @Test @@ -236,10 +235,11 @@ public void testTransactionSynchronization() { ResourcelessTransactionManager tm = new ResourcelessTransactionManager(); - new TransactionTemplate(tm).execute(s -> { - template.sendDefault("foo", "bar"); - return null; - }); + new TransactionTemplate(tm) + .execute(s -> { + template.sendDefault("foo", "bar"); + return null; + }); assertThat(producer.history()).containsExactly(new ProducerRecord<>(STRING_KEY_TOPIC, "foo", "bar")); assertThat(producer.transactionCommitted()).isTrue(); @@ -261,13 +261,14 @@ public void testTransactionSynchronizationExceptionOnCommit() { ResourcelessTransactionManager tm = new ResourcelessTransactionManager(); - new TransactionTemplate(tm).execute(s -> { - template.sendDefault("foo", "bar"); + new TransactionTemplate(tm) + .execute(s -> { + template.sendDefault("foo", "bar"); - // Mark the mock producer as fenced so it throws when committing the transaction - producer.fenceProducer(); - return null; - }); + // Mark the mock producer as fenced so it throws when committing the transaction + producer.fenceProducer(); + return null; + }); assertThat(producer.transactionCommitted()).isFalse(); assertThat(producer.closed()).isTrue(); @@ -291,12 +292,13 @@ public void testDeadLetterPublisherWhileTransactionActive() { KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); - new TransactionTemplate(tm).execute(s -> { - new DeadLetterPublishingRecoverer(template).accept( - new ConsumerRecord<>(STRING_KEY_TOPIC, 0, 0L, "key", "foo"), - new RuntimeException("foo")); - return null; - }); + new TransactionTemplate(tm) + .execute(s -> { + new DeadLetterPublishingRecoverer(template).accept( + new ConsumerRecord<>(STRING_KEY_TOPIC, 0, 0L, "key", "foo"), + new RuntimeException("foo")); + return null; + }); verify(producer1).beginTransaction(); verify(producer1).commitTransaction(); @@ -318,10 +320,12 @@ public void testNoAbortAfterCommitFailure() { KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); - assertThatThrownBy(() -> template.executeInTransaction(t -> { - producer.fenceProducer(); - return null; - })).isInstanceOf(ProducerFencedException.class); + assertThatExceptionOfType(ProducerFencedException.class) + .isThrownBy(() -> + template.executeInTransaction(t -> { + producer.fenceProducer(); + return null; + })); assertThat(producer.transactionCommitted()).isFalse(); assertThat(producer.transactionAborted()).isFalse(); @@ -343,9 +347,8 @@ public void testFencedOnBegin() { KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); - assertThatThrownBy(() -> template.executeInTransaction(t -> { - return null; - })).isInstanceOf(ProducerFencedException.class); + assertThatExceptionOfType(ProducerFencedException.class) + .isThrownBy(() -> template.executeInTransaction(t -> null)); assertThat(producer.transactionCommitted()).isFalse(); assertThat(producer.transactionAborted()).isFalse(); @@ -366,9 +369,12 @@ public void testAbort() { KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); - assertThatThrownBy(() -> template.executeInTransaction(t -> { - throw new RuntimeException("foo"); - })).isExactlyInstanceOf(RuntimeException.class).withFailMessage("foo"); + assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> + template.executeInTransaction(t -> { + throw new RuntimeException("foo"); + })) + .withMessage("foo"); assertThat(producer.transactionCommitted()).isFalse(); assertThat(producer.transactionAborted()).isTrue(); @@ -385,20 +391,21 @@ public void testExecuteInTransactionNewInnerTx() { producer1.initTransactions(); AtomicBoolean first = new AtomicBoolean(true); - DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory( - Collections.emptyMap()) { + DefaultKafkaProducerFactory pf = + new DefaultKafkaProducerFactory( + Collections.emptyMap()) { - @Override - protected Producer createTransactionalProducer() { - return first.getAndSet(false) ? producer1 : producer2; - } + @Override + protected Producer createTransactionalProducer() { + return first.getAndSet(false) ? producer1 : producer2; + } - @Override - Producer createTransactionalProducerForPartition() { - return createTransactionalProducer(); - } + @Override + Producer createTransactionalProducerForPartition() { + return createTransactionalProducer(); + } - }; + }; pf.setTransactionIdPrefix("tx."); KafkaTemplate template = new KafkaTemplate<>(pf); @@ -408,12 +415,12 @@ Producer createTransactionalProducerForPartition() { try { TransactionSupport.setTransactionIdSuffix("testExecuteInTransactionNewInnerTx"); - new TransactionTemplate(tm).execute(s -> { - return template.executeInTransaction(t -> { - template.sendDefault("foo", "bar"); - return null; - }); - }); + new TransactionTemplate(tm) + .execute(s -> + template.executeInTransaction(t -> { + template.sendDefault("foo", "bar"); + return null; + })); InOrder inOrder = inOrder(producer1, producer2); inOrder.verify(producer1).beginTransaction();