Skip to content

Commit 505bd84

Browse files
authored
GH-2001: Propagate Exception from TX Sync Commit
Resolves #2001 Previously, synchronized transaction commits (for producer initiated transactions) were not propagated to the caller. **cherry-pick to all supported branches** * Fix typo in doc
1 parent 82cb03e commit 505bd84

File tree

3 files changed

+69
-12
lines changed

3 files changed

+69
-12
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3402,6 +3402,10 @@ If you wish the commits to be performed in the reverse order (Kafka first), use
34023402

34033403
See <<ex-jdbc-sync>> for examples of an application that synchronizes JDBC and Kafka transactions in Kafka-first or DB-first configurations.
34043404

3405+
NOTE: Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller.
3406+
Previously, this was silently ignored (logged at debug).
3407+
Applications should take remedial action, if necessary, to compensate for the committed primary transaction.
3408+
34053409
[[container-transaction-manager]]
34063410
===== Using Consumer-Initiated Transactions
34073411

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -145,13 +145,15 @@ protected boolean shouldReleaseBeforeCompletion() {
145145
return false;
146146
}
147147

148+
@Override
149+
protected void processResourceAfterCommit(KafkaResourceHolder<K, V> resourceHolder) {
150+
resourceHolder.commit();
151+
}
152+
148153
@Override
149154
public void afterCompletion(int status) {
150155
try {
151-
if (status == TransactionSynchronization.STATUS_COMMITTED) {
152-
this.resourceHolder.commit();
153-
}
154-
else {
156+
if (status != TransactionSynchronization.STATUS_COMMITTED) {
155157
this.resourceHolder.rollback();
156158
}
157159
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,13 @@
7272
import org.springframework.kafka.test.context.EmbeddedKafka;
7373
import org.springframework.kafka.test.utils.KafkaTestUtils;
7474
import org.springframework.kafka.transaction.KafkaTransactionManager;
75+
import org.springframework.transaction.TransactionDefinition;
76+
import org.springframework.transaction.TransactionException;
7577
import org.springframework.transaction.annotation.EnableTransactionManagement;
7678
import org.springframework.transaction.annotation.Propagation;
7779
import org.springframework.transaction.annotation.Transactional;
7880
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
81+
import org.springframework.transaction.support.DefaultTransactionStatus;
7982
import org.springframework.transaction.support.TransactionTemplate;
8083
import org.springframework.util.concurrent.SettableListenableFuture;
8184

@@ -296,14 +299,15 @@ public void testTransactionSynchronizationExceptionOnCommit() {
296299

297300
ResourcelessTransactionManager tm = new ResourcelessTransactionManager();
298301

299-
new TransactionTemplate(tm)
300-
.execute(s -> {
301-
template.sendDefault("foo", "bar");
302+
assertThatExceptionOfType(ProducerFencedException.class).isThrownBy(() ->
303+
new TransactionTemplate(tm)
304+
.execute(s -> {
305+
template.sendDefault("foo", "bar");
302306

303-
// Mark the mock producer as fenced so it throws when committing the transaction
304-
producer.fenceProducer();
305-
return null;
306-
});
307+
// Mark the mock producer as fenced so it throws when committing the transaction
308+
producer.fenceProducer();
309+
return null;
310+
}));
307311

308312
assertThat(producer.transactionCommitted()).isFalse();
309313
assertThat(producer.closed()).isTrue();
@@ -574,6 +578,28 @@ void testNonTxWithTx() {
574578
pf.destroy();
575579
}
576580

581+
@Test
582+
void syncCommitFails() {
583+
DummyTM tm = new DummyTM();
584+
MockProducer<String, String> producer =
585+
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
586+
producer.initTransactions();
587+
producer.commitTransactionException = new IllegalStateException();
588+
589+
@SuppressWarnings("unchecked")
590+
ProducerFactory<String, String> pf = mock(ProducerFactory.class);
591+
given(pf.transactionCapable()).willReturn(true);
592+
given(pf.createProducer(isNull())).willReturn(producer);
593+
594+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
595+
template.setDefaultTopic(STRING_KEY_TOPIC);
596+
597+
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() ->
598+
new TransactionTemplate(tm).execute(status -> template.sendDefault("foo")));
599+
600+
assertThat(tm.committed).isTrue();
601+
}
602+
577603
@Configuration
578604
@EnableTransactionManagement
579605
public static class DeclarativeConfig {
@@ -681,4 +707,29 @@ public void anotherTxMethod() {
681707

682708
}
683709

710+
@SuppressWarnings("serial")
711+
private static final class DummyTM extends AbstractPlatformTransactionManager {
712+
713+
boolean committed;
714+
715+
@Override
716+
protected Object doGetTransaction() throws TransactionException {
717+
return new Object();
718+
}
719+
720+
@Override
721+
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
722+
}
723+
724+
@Override
725+
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
726+
this.committed = true;
727+
}
728+
729+
@Override
730+
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
731+
}
732+
733+
}
734+
684735
}

0 commit comments

Comments
 (0)