Skip to content

Commit 010c3e8

Browse files
garyrussellartembilan
authored andcommitted
GH-661: Close Producer after beginTransaction fail
Fixes #661 I could not reproduce the problem that I reported in the issue; so, if any exception occurs on `beginTransaction()`, throw the exception to the caller after closing the producer and prevent its return to the cache. Also, reading the javadocs for `ProducerFencedException`, we should have been closing the producer if that exception occurred anyway. ``` /** * This fatal exception indicates that another producer with the same <code>transactional.id</code> has been * started. It is only possible to have one producer instance with a <code>transactional.id</code> at any * given time, and the latest one to be started "fences" the previous instances so that they can no longer * make transactional requests. When you encounter this exception, you must close the producer instance. */ ``` Cherry-pick to master, 2.0.x, 1.3.x. * Fix `DefaultKafkaProducerFactoryTests` for its Mockito version and don't use JUnit 5
1 parent e826a58 commit 010c3e8

File tree

3 files changed

+144
-4
lines changed

3 files changed

+144
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,21 @@ public Producer<K, V> createProducer() {
203203
return this.producer;
204204
}
205205

206+
/**
207+
* Subclasses must return a raw producer which will be wrapped in a
208+
* {@link CloseSafeProducer}.
209+
* @return the producer.
210+
*/
206211
protected Producer<K, V> createKafkaProducer() {
207212
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
208213
}
209214

215+
/**
216+
* Subclasses must return a producer from the {@link #getCache()} or a
217+
* new raw producer wrapped in a {@link CloseSafeProducer}.
218+
* @return the producer - cannot be null.
219+
* @since 1.3
220+
*/
210221
protected Producer<K, V> createTransactionalProducer() {
211222
Producer<K, V> producer = this.cache.poll();
212223
if (producer == null) {
@@ -222,14 +233,28 @@ protected Producer<K, V> createTransactionalProducer() {
222233
}
223234
}
224235

225-
private static class CloseSafeProducer<K, V> implements Producer<K, V> {
236+
protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
237+
return this.cache;
238+
}
239+
240+
/**
241+
* A wrapper class for the delegate.
242+
*
243+
* @param <K> the key type.
244+
* @param <V> the value type.
245+
*
246+
*/
247+
protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
226248

227249
private final Producer<K, V> delegate;
228250

229251
private final BlockingQueue<CloseSafeProducer<K, V>> cache;
230252

253+
private volatile boolean txFailed;
254+
231255
CloseSafeProducer(Producer<K, V> delegate) {
232256
this(delegate, null);
257+
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
233258
}
234259

235260
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache) {
@@ -269,7 +294,21 @@ public void initTransactions() {
269294

270295
@Override
271296
public void beginTransaction() throws ProducerFencedException {
272-
this.delegate.beginTransaction();
297+
try {
298+
this.delegate.beginTransaction();
299+
}
300+
catch (RuntimeException e) {
301+
this.txFailed = true;
302+
logger.error("Illegal transaction state; producer removed from cache; possible cause: "
303+
+ "broker restarted during transaction", e);
304+
try {
305+
this.delegate.close();
306+
}
307+
catch (Exception ee) {
308+
// empty
309+
}
310+
throw e;
311+
}
273312
}
274313

275314
@Override
@@ -290,7 +329,7 @@ public void abortTransaction() throws ProducerFencedException {
290329

291330
@Override
292331
public void close() {
293-
if (this.cache != null) {
332+
if (this.cache != null && !this.txFailed) {
294333
synchronized (this) {
295334
if (!this.cache.contains(this)) {
296335
this.cache.offer(this);

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,8 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
237237
Producer<K, V> producer = this.producers.get();
238238
Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed");
239239
producer = this.producerFactory.createProducer();
240-
this.producers.set(producer);
241240
producer.beginTransaction();
241+
this.producers.set(producer);
242242
T result = null;
243243
try {
244244
result = callback.doInOperations(this);
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.BDDMockito.willAnswer;
21+
import static org.mockito.Matchers.any;
22+
import static org.mockito.Mockito.inOrder;
23+
import static org.mockito.Mockito.mock;
24+
25+
import java.util.HashMap;
26+
import java.util.concurrent.BlockingQueue;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
29+
import org.apache.kafka.clients.producer.Producer;
30+
import org.apache.kafka.common.KafkaException;
31+
import org.junit.Test;
32+
import org.mockito.InOrder;
33+
34+
import org.springframework.kafka.test.utils.KafkaTestUtils;
35+
import org.springframework.kafka.transaction.KafkaTransactionManager;
36+
import org.springframework.transaction.CannotCreateTransactionException;
37+
import org.springframework.transaction.support.TransactionTemplate;
38+
39+
/**
40+
* @author Gary Russell
41+
* @since 1.3.5
42+
*
43+
*/
44+
public class DefaultKafkaProducerFactoryTests {
45+
46+
@SuppressWarnings({ "rawtypes", "unchecked" })
47+
@Test
48+
public void testProducerClosedAfterBadTransition() throws Exception {
49+
final Producer producer = mock(Producer.class);
50+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
51+
52+
@Override
53+
protected Producer createTransactionalProducer() {
54+
producer.initTransactions();
55+
BlockingQueue<Producer> cache = getCache();
56+
Producer cached = cache.poll();
57+
return cached == null ? new CloseSafeProducer(producer, cache) : cached;
58+
}
59+
60+
};
61+
pf.setTransactionIdPrefix("foo");
62+
63+
final AtomicInteger flag = new AtomicInteger();
64+
willAnswer(i -> {
65+
if (flag.incrementAndGet() == 2) {
66+
throw new KafkaException("Invalid transition ...");
67+
}
68+
return null;
69+
}).given(producer).beginTransaction();
70+
71+
final KafkaTemplate kafkaTemplate = new KafkaTemplate(pf);
72+
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
73+
TransactionTemplate transactionTemplate = new TransactionTemplate(tm);
74+
transactionTemplate.execute(s -> {
75+
kafkaTemplate.send("foo", "bar");
76+
return null;
77+
});
78+
BlockingQueue cache = KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class);
79+
assertThat(cache).hasSize(1);
80+
try {
81+
transactionTemplate.execute(s -> {
82+
return null;
83+
});
84+
}
85+
catch (CannotCreateTransactionException e) {
86+
assertThat(e.getCause().getMessage()).contains("Invalid transition");
87+
}
88+
assertThat(cache).hasSize(0);
89+
90+
InOrder inOrder = inOrder(producer);
91+
inOrder.verify(producer).initTransactions();
92+
inOrder.verify(producer).beginTransaction();
93+
inOrder.verify(producer).send(any(), any());
94+
inOrder.verify(producer).commitTransaction();
95+
inOrder.verify(producer).beginTransaction();
96+
inOrder.verify(producer).close();
97+
inOrder.verifyNoMoreInteractions();
98+
pf.destroy();
99+
}
100+
101+
}

0 commit comments

Comments
 (0)