Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3402,6 +3402,10 @@ If you wish the commits to be performed in the reverse order (Kafka first), use

See <<ex-jdbc-sync>> for examples of an application that synchronizes JDBC and Kafka transactions in Kafka-first or DB-first configurations.

NOTE: Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller.
Previously, this was silently ignored (logged at debug).
Applications should take remedial action, if necessary, to compensate for the comitted primary transaction.

[[container-transaction-manager]]
===== Using Consumer-Initiated Transactions

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -145,13 +145,15 @@ protected boolean shouldReleaseBeforeCompletion() {
return false;
}

@Override
protected void processResourceAfterCommit(KafkaResourceHolder<K, V> resourceHolder) {
resourceHolder.commit();
}

@Override
public void afterCompletion(int status) {
try {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
this.resourceHolder.commit();
}
else {
if (status != TransactionSynchronization.STATUS_COMMITTED) {
this.resourceHolder.rollback();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,13 @@
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.concurrent.SettableListenableFuture;

Expand Down Expand Up @@ -296,14 +299,15 @@ public void testTransactionSynchronizationExceptionOnCommit() {

ResourcelessTransactionManager tm = new ResourcelessTransactionManager();

new TransactionTemplate(tm)
.execute(s -> {
template.sendDefault("foo", "bar");
assertThatExceptionOfType(ProducerFencedException.class).isThrownBy(() ->
new TransactionTemplate(tm)
.execute(s -> {
template.sendDefault("foo", "bar");

// Mark the mock producer as fenced so it throws when committing the transaction
producer.fenceProducer();
return null;
});
// Mark the mock producer as fenced so it throws when committing the transaction
producer.fenceProducer();
return null;
}));

assertThat(producer.transactionCommitted()).isFalse();
assertThat(producer.closed()).isTrue();
Expand Down Expand Up @@ -574,6 +578,28 @@ void testNonTxWithTx() {
pf.destroy();
}

@Test
void syncCommitFails() {
DummyTM tm = new DummyTM();
MockProducer<String, String> producer =
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
producer.initTransactions();
producer.commitTransactionException = new IllegalStateException();

@SuppressWarnings("unchecked")
ProducerFactory<String, String> pf = mock(ProducerFactory.class);
given(pf.transactionCapable()).willReturn(true);
given(pf.createProducer(isNull())).willReturn(producer);

KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);

assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() ->
new TransactionTemplate(tm).execute(status -> template.sendDefault("foo")));

assertThat(tm.committed).isTrue();
}

@Configuration
@EnableTransactionManagement
public static class DeclarativeConfig {
Expand Down Expand Up @@ -681,4 +707,29 @@ public void anotherTxMethod() {

}

@SuppressWarnings("serial")
private static final class DummyTM extends AbstractPlatformTransactionManager {

boolean committed;

@Override
protected Object doGetTransaction() throws TransactionException {
return new Object();
}

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
}

@Override
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
this.committed = true;
}

@Override
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
}

}

}