From 206d8b0402aaa8328c4ad8f81b1f5f959704e9d1 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 9 Nov 2021 11:15:15 -0500 Subject: [PATCH 1/2] GH-2001: Propagate Exception from TX Sync Commit Resolves https://github.com/spring-projects/spring-kafka/issues/2001 Previously, synchronized transaction commits (for producer initiated transactions) were not propagated to the caller. **cherry-pick to all supported branches** --- .../src/main/asciidoc/kafka.adoc | 4 ++ .../kafka/core/ProducerFactoryUtils.java | 12 ++-- .../core/KafkaTemplateTransactionTests.java | 65 +++++++++++++++++-- 3 files changed, 69 insertions(+), 12 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index ed332e4e9e..84df24caee 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -3402,6 +3402,10 @@ If you wish the commits to be performed in the reverse order (Kafka first), use See <> for examples of an application that synchronizes JDBC and Kafka transactions in Kafka-first or DB-first configurations. +NOTE: Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller. +Previously, this was silently ignored (logged at debug). +Applications should take remedial action, if necessary, to compensate for the comitted primary transaction. + [[container-transaction-manager]] ===== Using Consumer-Initiated Transactions diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java index da02f657e1..7a5b2c196a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -145,13 +145,15 @@ protected boolean shouldReleaseBeforeCompletion() { return false; } + @Override + protected void processResourceAfterCommit(KafkaResourceHolder resourceHolder) { + resourceHolder.commit(); + } + @Override public void afterCompletion(int status) { try { - if (status == TransactionSynchronization.STATUS_COMMITTED) { - this.resourceHolder.commit(); - } - else { + if (status != TransactionSynchronization.STATUS_COMMITTED) { this.resourceHolder.rollback(); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index abefc097c0..9a9434e825 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -72,10 +72,13 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.transaction.KafkaTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionException; import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.AbstractPlatformTransactionManager; +import org.springframework.transaction.support.DefaultTransactionStatus; import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.concurrent.SettableListenableFuture; @@ -296,14 +299,15 @@ public void testTransactionSynchronizationExceptionOnCommit() { ResourcelessTransactionManager tm = new ResourcelessTransactionManager(); - new TransactionTemplate(tm) - .execute(s -> { - template.sendDefault("foo", "bar"); + assertThatExceptionOfType(ProducerFencedException.class).isThrownBy(() -> + new TransactionTemplate(tm) + .execute(s -> { + template.sendDefault("foo", "bar"); - // Mark the mock producer as fenced so it throws when committing the transaction - producer.fenceProducer(); - return null; - }); + // Mark the mock producer as fenced so it throws when committing the transaction + producer.fenceProducer(); + return null; + })); assertThat(producer.transactionCommitted()).isFalse(); assertThat(producer.closed()).isTrue(); @@ -574,6 +578,28 @@ void testNonTxWithTx() { pf.destroy(); } + @Test + void syncCommitFails() { + DummyTM tm = new DummyTM(); + MockProducer producer = + new MockProducer<>(true, new StringSerializer(), new StringSerializer()); + producer.initTransactions(); + producer.commitTransactionException = new IllegalStateException(); + + @SuppressWarnings("unchecked") + ProducerFactory pf = mock(ProducerFactory.class); + given(pf.transactionCapable()).willReturn(true); + given(pf.createProducer(isNull())).willReturn(producer); + + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(STRING_KEY_TOPIC); + + assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> + new TransactionTemplate(tm).execute(status -> template.sendDefault("foo"))); + + assertThat(tm.committed).isTrue(); + } + @Configuration @EnableTransactionManagement public static class DeclarativeConfig { @@ -681,4 +707,29 @@ public void anotherTxMethod() { } + @SuppressWarnings("serial") + private static final class DummyTM extends AbstractPlatformTransactionManager { + + boolean committed; + + @Override + protected Object doGetTransaction() throws TransactionException { + return new Object(); + } + + @Override + protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { + } + + @Override + protected void doCommit(DefaultTransactionStatus status) throws TransactionException { + this.committed = true; + } + + @Override + protected void doRollback(DefaultTransactionStatus status) throws TransactionException { + } + + } + } From 27788c5009bfb791270a70caed7523efc0133b0a Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 9 Nov 2021 15:37:40 -0500 Subject: [PATCH 2/2] Fix typo in doc --- spring-kafka-docs/src/main/asciidoc/kafka.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 84df24caee..cbeaf8f79f 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -3404,7 +3404,7 @@ See <> for examples of an application that synchronizes JDBC and K NOTE: Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller. Previously, this was silently ignored (logged at debug). -Applications should take remedial action, if necessary, to compensate for the comitted primary transaction. +Applications should take remedial action, if necessary, to compensate for the committed primary transaction. [[container-transaction-manager]] ===== Using Consumer-Initiated Transactions