Skip to content

Commit 7d4af6f

Browse files
garyrussellartembilan
authored andcommitted
GH-767: Add DeadLetterPublishingRecoverer
Resolves #767 Publish failed records to a dead-letter topic. * Add try/catch * Polishing - PR Comments; allow user to modify the ProducerRecord. * Typo
1 parent 9ae454a commit 7d4af6f

File tree

6 files changed

+321
-17
lines changed

6 files changed

+321
-17
lines changed
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.io.PrintWriter;
20+
import java.io.StringWriter;
21+
import java.nio.ByteBuffer;
22+
import java.nio.charset.StandardCharsets;
23+
import java.util.function.BiConsumer;
24+
import java.util.function.BiFunction;
25+
26+
import org.apache.commons.logging.Log;
27+
import org.apache.commons.logging.LogFactory;
28+
import org.apache.kafka.clients.consumer.ConsumerRecord;
29+
import org.apache.kafka.clients.producer.ProducerRecord;
30+
import org.apache.kafka.common.TopicPartition;
31+
import org.apache.kafka.common.header.internals.RecordHeader;
32+
import org.apache.kafka.common.header.internals.RecordHeaders;
33+
34+
import org.springframework.kafka.core.KafkaOperations;
35+
import org.springframework.kafka.core.KafkaTemplate;
36+
import org.springframework.kafka.support.KafkaHeaders;
37+
import org.springframework.util.Assert;
38+
39+
/**
40+
* A {@link BiConsumer} that publishes a failed record to a dead-letter topic.
41+
*
42+
* @author Gary Russell
43+
* @since 2.2
44+
*
45+
*/
46+
public class DeadLetterPublishingRecoverer implements BiConsumer<ConsumerRecord<?, ?>, Exception> {
47+
48+
private static final Log logger = LogFactory.getLog(DeadLetterPublishingRecoverer.class);
49+
50+
private final KafkaTemplate<Object, Object> template;
51+
52+
private final boolean transactional;
53+
54+
private final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver;
55+
56+
/**
57+
* Create an instance with the provided template and a default destination resolving
58+
* function that returns a TopicPartition based on the original topic (appended with ".DLT")
59+
* from the failed record, and the same partition as the failed record. Therefore the
60+
* dead-letter topic must have at least as many partitions as the original topic.
61+
* @param template the {@link KafkaTemplate} to use for publishing.
62+
*/
63+
public DeadLetterPublishingRecoverer(KafkaTemplate<Object, Object> template) {
64+
this(template, (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition()));
65+
}
66+
67+
/**
68+
* Create an instance with the provided template and destination resolving function,
69+
* that receives the failed consumer record and the exception and returns a
70+
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is < 0, no
71+
* partition is set when publishing to the topic.
72+
* @param template the {@link KafkaTemplate} to use for publishing.
73+
* @param destinationResolver the resolving function.
74+
*/
75+
public DeadLetterPublishingRecoverer(KafkaTemplate<Object, Object> template,
76+
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
77+
78+
Assert.notNull(template, "The template cannot be null");
79+
Assert.notNull(destinationResolver, "The destinationResolver cannot be null");
80+
this.template = template;
81+
this.transactional = template.isTransactional();
82+
this.destinationResolver = destinationResolver;
83+
}
84+
85+
@Override
86+
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
87+
TopicPartition tp = this.destinationResolver.apply(record, exception);
88+
RecordHeaders headers = new RecordHeaders(record.headers().toArray());
89+
enhanceHeaders(headers, record, exception);
90+
ProducerRecord<Object, Object> outRecord = createProducerRecord(record, tp, headers);
91+
if (this.transactional) {
92+
this.template.executeInTransaction(t -> {
93+
publish(outRecord, t);
94+
return null;
95+
});
96+
}
97+
else {
98+
publish(outRecord, this.template);
99+
}
100+
}
101+
102+
/**
103+
* Subclasses can override this method to customize the producer record to send to the DLQ.
104+
* The default implementation simply copies the key and value from the consumer record
105+
* and adds the headers. The timestamp is not set (the original timestamp is in one of
106+
* the headers).
107+
* IMPORTANT: if the partition in the {@link TopicPartition} is < 0, it must be set to null
108+
* in the {@link ProducerRecord}.
109+
* @param record the failed record
110+
* @param topicPartition the {@link TopicPartition} returned by the destination resolver.
111+
* @param headers the headers - original record headers plus DLT headers.
112+
* @return the producer record to send.
113+
* @see KafkaHeaders
114+
*/
115+
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
116+
TopicPartition topicPartition, RecordHeaders headers) {
117+
118+
return new ProducerRecord<>(topicPartition.topic(),
119+
topicPartition.partition() < 0 ? null : topicPartition.partition(),
120+
record.key(), record.value(), headers);
121+
}
122+
123+
private void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> template) {
124+
try {
125+
template.send(outRecord).addCallback(result -> {
126+
if (logger.isDebugEnabled()) {
127+
logger.debug("Successful dead-letter publication: " + result);
128+
}
129+
}, ex -> {
130+
logger.error("Dead-letter publication failed for: " + outRecord, ex);
131+
});
132+
}
133+
catch (Exception e) {
134+
logger.error("Dead-letter publication failed for: " + outRecord, e);
135+
}
136+
}
137+
138+
private void enhanceHeaders(RecordHeaders kafkaHeaders, ConsumerRecord<?, ?> record, Exception exception) {
139+
kafkaHeaders.add(
140+
new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC, record.topic().getBytes(StandardCharsets.UTF_8)));
141+
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION,
142+
ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array()));
143+
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET,
144+
ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array()));
145+
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP,
146+
ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array()));
147+
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE,
148+
record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
149+
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_FQCN,
150+
exception.getClass().getName().getBytes(StandardCharsets.UTF_8)));
151+
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE,
152+
exception.getMessage().getBytes(StandardCharsets.UTF_8)));
153+
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE,
154+
getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)));
155+
}
156+
157+
private String getStackTraceAsString(Throwable cause) {
158+
StringWriter stringWriter = new StringWriter();
159+
PrintWriter printWriter = new PrintWriter(stringWriter, true);
160+
cause.printStackTrace(printWriter);
161+
return stringWriter.getBuffer().toString();
162+
}
163+
164+
}

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,52 @@ public abstract class KafkaHeaders {
133133
*/
134134
public static final String REPLY_PARTITION = PREFIX + "replyPartition";
135135

136+
/**
137+
* Exception class name for a record published sent to a dead-letter topic.
138+
* @since 2.2
139+
*/
140+
public static final String DLT_EXCEPTION_FQCN = PREFIX + "dlt-exception-fqcn";
141+
142+
/**
143+
* Exception stack trace for a record published to a dead-letter topic.
144+
* @since 2.2
145+
*/
146+
public static final String DLT_EXCEPTION_STACKTRACE = PREFIX + "dlt-exception-stacktrace";
147+
148+
/**
149+
* Exception message for a record published to a dead-letter topic.
150+
* @since 2.2
151+
*/
152+
public static final String DLT_EXCEPTION_MESSAGE = PREFIX + "dlt-exception-message";
153+
154+
/**
155+
* Original topic for a record published to a dead-letter topic.
156+
* @since 2.2
157+
*/
158+
public static final String DLT_ORIGINAL_TOPIC = PREFIX + "dlt-original-topic";
159+
160+
/**
161+
* Original partition for a record published to a dead-letter topic.
162+
* @since 2.2
163+
*/
164+
public static final String DLT_ORIGINAL_PARTITION = PREFIX + "dlt-original-partition";
165+
166+
/**
167+
* Original offset for a record published to a dead-letter topic.
168+
* @since 2.2
169+
*/
170+
public static final String DLT_ORIGINAL_OFFSET = PREFIX + "dlt-original-offset";
171+
172+
/**
173+
* Original timestamp for a record published to a dead-letter topic.
174+
* @since 2.2
175+
*/
176+
public static final String DLT_ORIGINAL_TIMESTAMP = PREFIX + "dlt-original-timestamp";
177+
178+
/**
179+
* Original timestamp type for a record published to a dead-letter topic.
180+
* @since 2.2
181+
*/
182+
public static final String DLT_ORIGINAL_TIMESTAMP_TYPE = PREFIX + "dlt-original-timestamp-type";
183+
136184
}

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@
2424
import java.util.concurrent.CountDownLatch;
2525
import java.util.concurrent.TimeUnit;
2626
import java.util.concurrent.atomic.AtomicReference;
27-
import java.util.function.BiConsumer;
2827

28+
import org.apache.kafka.clients.consumer.Consumer;
2929
import org.apache.kafka.clients.consumer.ConsumerConfig;
3030
import org.apache.kafka.clients.consumer.ConsumerRecord;
3131
import org.apache.kafka.clients.producer.ProducerConfig;
32+
import org.apache.kafka.common.TopicPartition;
3233
import org.junit.ClassRule;
3334
import org.junit.Test;
3435

@@ -49,8 +50,10 @@ public class SeekToCurrentRecovererTests {
4950

5051
private static String topic1 = "seekTopic1";
5152

53+
private static String topic1DLT = "seekTopic1.FOO";
54+
5255
@ClassRule
53-
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, topic1);
56+
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, topic1, topic1DLT);
5457

5558
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
5659

@@ -66,8 +69,8 @@ public void testMaxFailures() throws Exception {
6669

6770
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
6871
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
69-
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
70-
final KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
72+
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(senderProps);
73+
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
7174
final CountDownLatch latch = new CountDownLatch(1);
7275
AtomicReference<String> data = new AtomicReference<>();
7376
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
@@ -82,11 +85,17 @@ public void testMaxFailures() throws Exception {
8285
new KafkaMessageListenerContainer<>(cf, containerProps);
8386
container.setBeanName("testSeekMaxFailures");
8487
final CountDownLatch recoverLatch = new CountDownLatch(1);
85-
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (r, t) -> {
86-
recoverLatch.countDown();
88+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
89+
(r, e) -> new TopicPartition(topic1DLT, r.partition())) {
90+
91+
@Override
92+
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
93+
super.accept(record, exception);
94+
recoverLatch.countDown();
95+
}
96+
8797
};
88-
SeekToCurrentErrorHandler errorHandler =
89-
spy(new SeekToCurrentErrorHandler(recoverer, 3));
98+
SeekToCurrentErrorHandler errorHandler = spy(new SeekToCurrentErrorHandler(recoverer, 3));
9099
container.setErrorHandler(errorHandler);
91100
final CountDownLatch stopLatch = new CountDownLatch(1);
92101
container.setApplicationEventPublisher(e -> {
@@ -103,6 +112,10 @@ public void testMaxFailures() throws Exception {
103112
assertThat(data.get()).isEqualTo("bar");
104113
assertThat(recoverLatch.await(10, TimeUnit.SECONDS)).isTrue();
105114
container.stop();
115+
Consumer<Integer, String> consumer = cf.createConsumer();
116+
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic1DLT);
117+
ConsumerRecord<Integer, String> dltRecord = KafkaTestUtils.getSingleRecord(consumer, topic1DLT);
118+
assertThat(dltRecord.value()).isEqualTo("foo");
106119
pf.destroy();
107120
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
108121
verify(errorHandler).clearThreadState();

0 commit comments

Comments
 (0)