Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5532,6 +5532,24 @@ Starting with version 2.7, the recoverer checks that the partition selected by t
If the partition is not present, the partition in the `ProducerRecord` is set to `null`, allowing the `KafkaProducer` to select the partition.
You can disable this check by setting the `verifyPartition` property to `false`.

[[dlpr-headers]]
===== Managing Dead Letter Record Headers

Referring to <<dead-letters>> above, the `DeadLetterPublishingRecoverer` has two properties used to manage headers when those headers already exist (such as when reprocessing a dead letter record that failed, including when using <<retry-topic>>).

* `appendOriginalHeaders` (default `true`)
* `stripPreviousExceptionHeaders` (default `true` since version 2.8)

Apache Kafka supports multiple headers with the same name; to obtain the "latest" value, you can use `headers.lastHeader(headerName)`; to get an iterator over multiple headers, use `headers.headers(headerName).iterator()`.

When repeatedly republishing a failed record, these headers can grow (and eventually cause publication to fail due to a `RecordTooLargeException`); this is especially true for the exception headers and particularly for the stack trace headers.

The reason for the two properties is because, while you might want to retain only the last exception information, you might want to retain the history of which topic(s) the record passed through for each failure.

`appendOriginalHeaders` is applied to all headers named `*ORIGINAL*` while `stripPreviousExceptionHeaders` is applied to all headers named `*EXCEPTION*`.

Also see <<retry-headers>>.

[[exp-backoff]]
===== `ExponentialBackOffWithMaxRetries` Implementation

Expand Down
27 changes: 27 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,33 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>

NOTE: By default the topics are autocreated with one partition and a replication factor of one.

[[retry-headers]]
===== Failure Header Management

When considering how to manage failure headers (original headers and exception headers), the framework delegates to the `DeadLetterPublishingRecover` to decide whether to append or replace the headers.

By default, it explicitly sets `appendOriginalHeaders` to `false` and leaves `stripPreviousExceptionHeaders` to the default used by the `DeadLetterPublishingRecover`.

This means that, currently, records published to multiple retry topics may grow to large size, especially when the stack trace is large.

See <<dlpr-headers>> for more information.

To reconfigure the framework to use different settings for these properties, replace standard `DeadLetterPublishingRecovererFactory` bean by adding a `recovererCustomizer`:

====
[source, java]
----
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver);
factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
dlpr.appendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
return factory;
}
----
====

==== Topic Naming

Expand Down
7 changes: 7 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,10 @@ The `interceptBeforeTx` container property is now `true` by default.

The `DelegatingByTopicSerializer` and `DelegatingByTopicDeserializer` are now provided.
See <<delegating-serialization>> for more information.

[[x28-dlpr]]
==== `DeadLetterPublishingRecover` Changes

The property `stripPreviousExceptionHeaders` is now `true` by default.

See <<dlpr-headers>> for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,16 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement

private Duration waitForSendResultTimeout = Duration.ofSeconds(THIRTY);

private boolean replaceOriginalHeaders = true;
private boolean appendOriginalHeaders = true;

private boolean failIfSendResultIsError = true;

private boolean throwIfNoDestinationReturned = false;

private long timeoutBuffer = Duration.ofSeconds(FIVE).toMillis();

private boolean stripPreviousExceptionHeaders = true;

/**
* Create an instance with the provided template and a default destination resolving
* function that returns a TopicPartition based on the original topic (appended with ".DLT")
Expand Down Expand Up @@ -246,13 +248,27 @@ public void setPartitionInfoTimeout(Duration partitionInfoTimeout) {
}

/**
* Set to false if you don't want to replace the dead letter original headers if
* they are already present.
* Set to false if you don't want to append the current "original" headers (topic,
* partition etc.) if they are already present. When false, only the first "original"
* headers are retained.
* @param replaceOriginalHeaders set to false not to replace.
* @since 2.7
* @deprecated in favor of {@link #setAppendOriginalHeaders(boolean)}.
*/
@Deprecated
public void setReplaceOriginalHeaders(boolean replaceOriginalHeaders) {
this.replaceOriginalHeaders = replaceOriginalHeaders;
this.appendOriginalHeaders = replaceOriginalHeaders;
}

/**
* Set to false if you don't want to append the current "original" headers (topic,
* partition etc.) if they are already present. When false, only the first "original"
* headers are retained.
* @param appendOriginalHeaders set to false not to replace.
* @since 2.7.9
*/
public void setAppendOriginalHeaders(boolean appendOriginalHeaders) {
this.appendOriginalHeaders = appendOriginalHeaders;
}

/**
Expand Down Expand Up @@ -299,6 +315,18 @@ public void setTimeoutBuffer(long buffer) {
this.timeoutBuffer = buffer;
}

/**
* Set to false to retain previous exception headers as well as headers for the
* current exception. Default is true, which means only the current headers are
* retained; setting it to false this can cause a growth in record size when a record
* is republished many times.
* @param stripPreviousExceptionHeaders false to retain all.
* @since 2.7.9
*/
public void setStripPreviousExceptionHeaders(boolean stripPreviousExceptionHeaders) {
this.stripPreviousExceptionHeaders = stripPreviousExceptionHeaders;
}

@SuppressWarnings("unchecked")
@Override
public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consumer, Exception exception) {
Expand Down Expand Up @@ -563,32 +591,39 @@ private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?>
}

private void maybeAddHeader(Headers kafkaHeaders, String header, byte[] value) {
if (this.replaceOriginalHeaders || kafkaHeaders.lastHeader(header) == null) {
if (this.appendOriginalHeaders || kafkaHeaders.lastHeader(header) == null) {
kafkaHeaders.add(header, value);
}
}

void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception, boolean isKey) {
kafkaHeaders.add(new RecordHeader(isKey ? this.headerNames.exceptionInfo.keyExceptionFqcn
appendOrReplace(kafkaHeaders, new RecordHeader(isKey ? this.headerNames.exceptionInfo.keyExceptionFqcn
: this.headerNames.exceptionInfo.exceptionFqcn,
exception.getClass().getName().getBytes(StandardCharsets.UTF_8)));
if (!isKey && exception.getCause() != null) {
kafkaHeaders.add(new RecordHeader(this.headerNames.exceptionInfo.exceptionCauseFqcn,
appendOrReplace(kafkaHeaders, new RecordHeader(this.headerNames.exceptionInfo.exceptionCauseFqcn,
exception.getCause().getClass().getName().getBytes(StandardCharsets.UTF_8)));
}
String message = exception.getMessage();
if (message != null) {
kafkaHeaders.add(new RecordHeader(isKey
appendOrReplace(kafkaHeaders, new RecordHeader(isKey
? this.headerNames.exceptionInfo.keyExceptionMessage
: this.headerNames.exceptionInfo.exceptionMessage,
exception.getMessage().getBytes(StandardCharsets.UTF_8)));
}
kafkaHeaders.add(new RecordHeader(isKey
appendOrReplace(kafkaHeaders, new RecordHeader(isKey
? this.headerNames.exceptionInfo.keyExceptionStacktrace
: this.headerNames.exceptionInfo.exceptionStacktrace,
getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)));
}

private void appendOrReplace(Headers headers, RecordHeader header) {
if (this.stripPreviousExceptionHeaders) {
headers.remove(header.key());
}
headers.add(header);
}

private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {

recoverer.setHeadersFunction((consumerRecord, e) -> addHeaders(consumerRecord, e, getAttempts(consumerRecord)));
recoverer.setFailIfSendResultIsError(true);
recoverer.setReplaceOriginalHeaders(false);
recoverer.setAppendOriginalHeaders(false);
recoverer.setThrowIfNoDestinationReturned(false);
this.recovererCustomizer.accept(recoverer);
this.fatalExceptions.forEach(ex -> recoverer.addNotRetryableExceptions(ex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void registerBeans() {
DefaultDestinationTopicProcessor.class);
registerIfNotContains(RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME,
ListenerContainerFactoryConfigurer.class);
registerIfNotContains(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_PROVIDER_NAME,
registerIfNotContains(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME,
DeadLetterPublishingRecovererFactory.class);
registerIfNotContains(RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER, RetryTopicConfigurer.class);
registerIfNotContains(RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,16 @@ public abstract class RetryTopicInternalBeanNames {
/**
* {@link DeadLetterPublishingRecovererFactory} bean name.
*/
public static final String DEAD_LETTER_PUBLISHING_RECOVERER_PROVIDER_NAME = "internalDeadLetterPublishingRecovererProvider";
public static final String DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME =
"internalDeadLetterPublishingRecovererProvider";

/**
* {@link DeadLetterPublishingRecovererFactory} bean name.
* @deprecated in favor of {@link #DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME}
*/
@Deprecated
public static final String DEAD_LETTER_PUBLISHING_RECOVERER_PROVIDER_NAME =
DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME;

/**
* {@link DestinationTopicContainer} bean name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -331,15 +332,15 @@ void allOriginalHeaders() {

@SuppressWarnings({"unchecked", "rawtypes"})
@Test
void dontReplaceOriginalHeaders() {
void dontAppendOriginalHeaders() {
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
ListenableFuture future = mock(ListenableFuture.class);
given(template.send(any(ProducerRecord.class))).willReturn(future);
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, 1234L,
TimestampType.CREATE_TIME, 123, 123, "bar", null, new RecordHeaders(), Optional.empty());
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setReplaceOriginalHeaders(false);
recoverer.accept(record, new RuntimeException());
recoverer.setAppendOriginalHeaders(false);
recoverer.accept(record, new RuntimeException(new IllegalStateException()));
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
then(template).should(times(1)).send(producerRecordCaptor.capture());
Headers headers = producerRecordCaptor.getValue().headers();
Expand All @@ -348,32 +349,50 @@ void dontReplaceOriginalHeaders() {
Header originalOffsetHeader = headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET);
Header originalTimestampHeader = headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP);
Header originalTimestampType = headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE);
Header firstExceptionType = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN);
Header firstExceptionCauseType = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN);
Header firstExceptionMessage = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE);
Header firstExceptionStackTrace = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE);

ConsumerRecord<String, String> anotherRecord = new ConsumerRecord<>("bar", 1, 12L, 4321L,
TimestampType.LOG_APPEND_TIME, 321, 321, "bar", null, new RecordHeaders(), Optional.empty());
headers.forEach(header -> anotherRecord.headers().add(header));
recoverer.accept(anotherRecord, new RuntimeException());
recoverer.accept(anotherRecord, new RuntimeException(new IllegalStateException()));
ArgumentCaptor<ProducerRecord> anotherProducerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
then(template).should(times(2)).send(producerRecordCaptor.capture());
Headers anotherHeaders = producerRecordCaptor.getAllValues().get(1).headers();
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isEqualTo(originalTopicHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isEqualTo(originalPartitionHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isEqualTo(originalOffsetHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isEqualTo(originalTimestampHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isEqualTo(originalTimestampType);
then(template).should(times(2)).send(anotherProducerRecordCaptor.capture());
Headers anotherHeaders = anotherProducerRecordCaptor.getAllValues().get(1).headers();
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isSameAs(originalTopicHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isSameAs(originalPartitionHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isSameAs(originalOffsetHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isSameAs(originalTimestampHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE))
.isSameAs(originalTimestampType);
Iterator<Header> originalTopics = anotherHeaders.headers(KafkaHeaders.DLT_ORIGINAL_TOPIC).iterator();
assertThat(originalTopics.next()).isSameAs(originalTopicHeader);
assertThat(originalTopics.hasNext()).isFalse();
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNotSameAs(firstExceptionType);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN))
.isNotSameAs(firstExceptionCauseType);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotSameAs(firstExceptionMessage);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE))
.isNotSameAs(firstExceptionStackTrace);
Iterator<Header> exceptionHeaders = anotherHeaders.headers(KafkaHeaders.DLT_EXCEPTION_FQCN).iterator();
assertThat(exceptionHeaders.next()).isNotSameAs(firstExceptionType);
assertThat(exceptionHeaders.hasNext()).isFalse();
}

@SuppressWarnings({"unchecked", "rawtypes"})
@Test
void replaceOriginalHeaders() {
void appendOriginalHeaders() {
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
ListenableFuture future = mock(ListenableFuture.class);
given(template.send(any(ProducerRecord.class))).willReturn(future);
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, 1234L,
TimestampType.CREATE_TIME, 123, 123, "bar", null, new RecordHeaders(), Optional.empty());
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setReplaceOriginalHeaders(true);
recoverer.accept(record, new RuntimeException());
recoverer.setAppendOriginalHeaders(true);
recoverer.setStripPreviousExceptionHeaders(false);
recoverer.accept(record, new RuntimeException(new IllegalStateException()));
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
then(template).should(times(1)).send(producerRecordCaptor.capture());
Headers headers = producerRecordCaptor.getValue().headers();
Expand All @@ -382,19 +401,40 @@ void replaceOriginalHeaders() {
Header originalOffsetHeader = headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET);
Header originalTimestampHeader = headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP);
Header originalTimestampType = headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE);
Header firstExceptionType = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN);
Header firstExceptionCauseType = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN);
Header firstExceptionMessage = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE);
Header firstExceptionStackTrace = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE);

ConsumerRecord<String, String> anotherRecord = new ConsumerRecord<>("bar", 1, 12L, 4321L,
TimestampType.LOG_APPEND_TIME, 321, 321, "bar", null, new RecordHeaders(), Optional.empty());
headers.forEach(header -> anotherRecord.headers().add(header));
recoverer.accept(anotherRecord, new RuntimeException());
recoverer.accept(anotherRecord, new RuntimeException(new IllegalStateException()));
ArgumentCaptor<ProducerRecord> anotherProducerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
then(template).should(times(2)).send(anotherProducerRecordCaptor.capture());
Headers anotherHeaders = anotherProducerRecordCaptor.getAllValues().get(1).headers();
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNotEqualTo(originalTopicHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNotEqualTo(originalPartitionHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNotEqualTo(originalOffsetHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNotEqualTo(originalTimestampHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNotEqualTo(originalTimestampType);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNotSameAs(originalTopicHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION))
.isNotSameAs(originalPartitionHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNotSameAs(originalOffsetHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP))
.isNotSameAs(originalTimestampHeader);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE))
.isNotSameAs(originalTimestampType);
Iterator<Header> originalTopics = anotherHeaders.headers(KafkaHeaders.DLT_ORIGINAL_TOPIC).iterator();
assertThat(originalTopics.next()).isSameAs(originalTopicHeader);
assertThat(originalTopics.next()).isNotNull();
assertThat(originalTopics.hasNext()).isFalse();
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNotSameAs(firstExceptionType);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN))
.isNotSameAs(firstExceptionCauseType);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotSameAs(firstExceptionMessage);
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE))
.isNotSameAs(firstExceptionStackTrace);
Iterator<Header> exceptionHeaders = anotherHeaders.headers(KafkaHeaders.DLT_EXCEPTION_FQCN).iterator();
assertThat(exceptionHeaders.next()).isSameAs(firstExceptionType);
assertThat(exceptionHeaders.next()).isNotNull();
assertThat(exceptionHeaders.hasNext()).isFalse();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down
Loading