Skip to content

Commit 04110a6

Browse files
garyrussellartembilan
authored andcommitted
GH-683: Fix Transactions with ErrorHandler
Fixes #683 If the error handler handles the error, the offset should be sent to the tx. * Polish ackOnError Javadoc to reflect transactions
1 parent 2ec0e32 commit 04110a6

File tree

3 files changed

+22
-9
lines changed

3 files changed

+22
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,13 @@ public void setIdleEventInterval(Long idleEventInterval) {
351351
* offset of the failed message anyway, so this option has limited applicability.
352352
* Perhaps useful for a component that starts throwing exceptions consistently;
353353
* allowing it to resume when restarted from the last successfully processed message.
354+
* <p>
355+
* Does not apply when transactions are used - in that case, whether or not the
356+
* offsets are sent to the transaction depends on whether the transaction is committed
357+
* or rolled back. If a listener throws an exception, the transaction will normally
358+
* be rolled back unless an error handler is provided that handles the error and
359+
* exits normally; in which case the offsets are sent to the transaction and the
360+
* transaction is committed.
354361
* @param ackOnError whether the container should acknowledge messages that throw
355362
* exceptions.
356363
*/

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,12 +1112,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
11121112
this.errorHandler.handle(e, record, this.consumer);
11131113
}
11141114
if (producer != null) {
1115-
try {
1116-
sendOffsetsToTransaction(producer);
1117-
}
1118-
catch (Exception e1) {
1119-
this.logger.error("Send offsets to transaction failed", e1);
1120-
}
1115+
ackCurrent(record, producer);
11211116
}
11221117
}
11231118
catch (RuntimeException ee) {

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,16 +98,21 @@ public class TransactionalContainerTests {
9898

9999
@Test
100100
public void testConsumeAndProduceTransactionKTM() throws Exception {
101-
testConsumeAndProduceTransactionGuts(false);
101+
testConsumeAndProduceTransactionGuts(false, false);
102102
}
103103

104104
@Test
105105
public void testConsumeAndProduceTransactionKCTM() throws Exception {
106-
testConsumeAndProduceTransactionGuts(true);
106+
testConsumeAndProduceTransactionGuts(true, false);
107+
}
108+
109+
@Test
110+
public void testConsumeAndProduceTransactionHandleError() throws Exception {
111+
testConsumeAndProduceTransactionGuts(false, true);
107112
}
108113

109114
@SuppressWarnings({ "rawtypes", "unchecked" })
110-
private void testConsumeAndProduceTransactionGuts(boolean chained) throws Exception {
115+
private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handleError) throws Exception {
111116
Consumer consumer = mock(Consumer.class);
112117
final TopicPartition topicPartition = new TopicPartition("foo", 0);
113118
willAnswer(i -> {
@@ -149,9 +154,15 @@ private void testConsumeAndProduceTransactionGuts(boolean chained) throws Except
149154
final KafkaTemplate template = new KafkaTemplate(pf);
150155
props.setMessageListener((MessageListener) m -> {
151156
template.send("bar", "baz");
157+
if (handleError) {
158+
throw new RuntimeException("fail");
159+
}
152160
});
153161
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props);
154162
container.setBeanName("commit");
163+
if (handleError) {
164+
container.setErrorHandler((e, data) -> { });
165+
}
155166
container.start();
156167
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
157168
InOrder inOrder = inOrder(producer);

0 commit comments

Comments
 (0)