Skip to content

Commit fa47642

Browse files
garyrussellartembilan
authored andcommitted
GH-2001: Propagate Exception from TX Sync Commit
Resolves #2001 Previously, synchronized transaction commits (for producer initiated transactions) were not propagated to the caller. **cherry-pick to all supported branches** * Fix typo in doc # Conflicts: # src/reference/asciidoc/kafka.adoc
1 parent 4905284 commit fa47642

File tree

3 files changed

+69
-12
lines changed

3 files changed

+69
-12
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -145,13 +145,15 @@ protected boolean shouldReleaseBeforeCompletion() {
145145
return false;
146146
}
147147

148+
@Override
149+
protected void processResourceAfterCommit(KafkaResourceHolder<K, V> resourceHolder) {
150+
resourceHolder.commit();
151+
}
152+
148153
@Override
149154
public void afterCompletion(int status) {
150155
try {
151-
if (status == TransactionSynchronization.STATUS_COMMITTED) {
152-
this.resourceHolder.commit();
153-
}
154-
else {
156+
if (status != TransactionSynchronization.STATUS_COMMITTED) {
155157
this.resourceHolder.rollback();
156158
}
157159
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,13 @@
7272
import org.springframework.kafka.test.context.EmbeddedKafka;
7373
import org.springframework.kafka.test.utils.KafkaTestUtils;
7474
import org.springframework.kafka.transaction.KafkaTransactionManager;
75+
import org.springframework.transaction.TransactionDefinition;
76+
import org.springframework.transaction.TransactionException;
7577
import org.springframework.transaction.annotation.EnableTransactionManagement;
7678
import org.springframework.transaction.annotation.Propagation;
7779
import org.springframework.transaction.annotation.Transactional;
7880
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
81+
import org.springframework.transaction.support.DefaultTransactionStatus;
7982
import org.springframework.transaction.support.TransactionTemplate;
8083
import org.springframework.util.concurrent.SettableListenableFuture;
8184

@@ -296,14 +299,15 @@ public void testTransactionSynchronizationExceptionOnCommit() {
296299

297300
ResourcelessTransactionManager tm = new ResourcelessTransactionManager();
298301

299-
new TransactionTemplate(tm)
300-
.execute(s -> {
301-
template.sendDefault("foo", "bar");
302+
assertThatExceptionOfType(ProducerFencedException.class).isThrownBy(() ->
303+
new TransactionTemplate(tm)
304+
.execute(s -> {
305+
template.sendDefault("foo", "bar");
302306

303-
// Mark the mock producer as fenced so it throws when committing the transaction
304-
producer.fenceProducer();
305-
return null;
306-
});
307+
// Mark the mock producer as fenced so it throws when committing the transaction
308+
producer.fenceProducer();
309+
return null;
310+
}));
307311

308312
assertThat(producer.transactionCommitted()).isFalse();
309313
assertThat(producer.closed()).isTrue();
@@ -573,6 +577,28 @@ void testNonTxWithTx() {
573577
pf.destroy();
574578
}
575579

580+
@Test
581+
void syncCommitFails() {
582+
DummyTM tm = new DummyTM();
583+
MockProducer<String, String> producer =
584+
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
585+
producer.initTransactions();
586+
producer.commitTransactionException = new IllegalStateException();
587+
588+
@SuppressWarnings("unchecked")
589+
ProducerFactory<String, String> pf = mock(ProducerFactory.class);
590+
given(pf.transactionCapable()).willReturn(true);
591+
given(pf.createProducer(isNull())).willReturn(producer);
592+
593+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
594+
template.setDefaultTopic(STRING_KEY_TOPIC);
595+
596+
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() ->
597+
new TransactionTemplate(tm).execute(status -> template.sendDefault("foo")));
598+
599+
assertThat(tm.committed).isTrue();
600+
}
601+
576602
@Configuration
577603
@EnableTransactionManagement
578604
public static class DeclarativeConfig {
@@ -680,4 +706,29 @@ public void anotherTxMethod() {
680706

681707
}
682708

709+
@SuppressWarnings("serial")
710+
private static final class DummyTM extends AbstractPlatformTransactionManager {
711+
712+
boolean committed;
713+
714+
@Override
715+
protected Object doGetTransaction() throws TransactionException {
716+
return new Object();
717+
}
718+
719+
@Override
720+
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
721+
}
722+
723+
@Override
724+
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
725+
this.committed = true;
726+
}
727+
728+
@Override
729+
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
730+
}
731+
732+
}
733+
683734
}

src/reference/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3316,6 +3316,10 @@ The interceptor for the `@Transactional` annotation starts the transaction and t
33163316
When the method exits, the database transaction will commit followed by the Kafka transaction.
33173317
If you wish the commits to be performed in the reverse order (Kafka first), use a <<chained-transaction-manager,`ChainedTransactionManager`>> configured with the `DataSourceTransactionManager` as the first TM and then the `KafkaTransactionManager`, in that order.
33183318

3319+
NOTE: Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller.
3320+
Previously, this was silently ignored (logged at debug).
3321+
Applications should take remedial action, if necessary, to compensate for the committed primary transaction.
3322+
33193323
[[chained-transaction-manager]]
33203324
===== Using `ChainedKafkaTransactionManager`
33213325

0 commit comments

Comments
 (0)