-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Description
The documentation for KafkaProducer
has an example for exception handling for transactions:
https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
We noticed some issues when using Kafka transactions in our application we believe are related to slightly incorrect exception handling in Spring Kafka:
- When synchronizing the Kafka transaction with another transaction manager (e.g.
DataSourceTransactionManager
), if an exception occurs when callingcommitTransaction
orabortTransaction
, the Kafka producer is leaked. - If an exception occurs when calling
commitTransaction
orabortTransaction
the Kafka producer can be reused throughDefaultKafkaProducerFactory
despite it being possibly in a fatal failure state. - If an exception occurs when calling
beginTransaction
and not usingDefaultKafkaProducerFactory
as the producer factory, the Kafka producer can be leaked.
Metadata
Metadata
Assignees
Labels
No labels