Skip to content

GH-2528: DLPR - Support Header Replacement #2529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,9 @@ protected void configureCustomizers(CustomizersConfigurer customizersConfigurer)

Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a `headersFunction` to the factory - `factory.setHeadersFunction((rec, ex) -> { ... })`

By default, any headers added will be cumulative - Kafka headers can contain multiple values.
Starting with version 2.9.5, if the `Headers` returned by the function contains a header of type `DeadLetterPublishingRecoverer.SingleRecordHeader`, then any existing values for that header will be removed and only the new single value will remain.

[[retry-topic-combine-blocking]]
==== Combining Blocking and Non-Blocking Retries

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,7 +40,9 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;

import org.springframework.core.log.LogAccessor;
Expand Down Expand Up @@ -224,7 +226,9 @@ public void setRetainExceptionHeader(boolean retainExceptionHeader) {

/**
* Set a function which will be called to obtain additional headers to add to the
* published record.
* published record. If a {@link Header} returned is an instance of
* {@link SingleRecordHeader}, then that header will replace any existing header of
* that name, rather than being appended as a new value.
* @param headersFunction the headers function.
* @since 2.5.4
* @see #addHeadersFunction(BiFunction)
Expand Down Expand Up @@ -411,7 +415,10 @@ public void includeHeader(HeaderNames.HeadersToAdd... headers) {
/**
* Add a function which will be called to obtain additional headers to add to the
* published record. Functions are called in the order that they are added, and after
* any function passed into {@link #setHeadersFunction(BiFunction)}.
* any function passed into {@link #setHeadersFunction(BiFunction)}. If a
* {@link Header} returned is an instance of {@link SingleRecordHeader}, then that
* header will replace any existing header of that name, rather than being appended as
* a new value.
* @param headersFunction the headers function.
* @since 2.8.4
* @see #setHeadersFunction(BiFunction)
Expand Down Expand Up @@ -707,7 +714,12 @@ private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, E
maybeAddOriginalHeaders(kafkaHeaders, record, exception);
Headers headers = this.headersFunction.apply(record, exception);
if (headers != null) {
headers.forEach(kafkaHeaders::add);
headers.forEach(header -> {
if (header instanceof SingleRecordHeader) {
kafkaHeaders.remove(header.key());
}
kafkaHeaders.add(header);
});
}
}

Expand Down Expand Up @@ -1374,4 +1386,34 @@ public interface ExceptionHeadersCreator {

}

/**
* A {@link Header} that indicates that this header should replace any existing headers
* with this name, rather than being appended to the headers, which is the normal behavior.
*
* @since 2.9.5
* @see DeadLetterPublishingRecoverer#setHeadersFunction(BiFunction)
* @see DeadLetterPublishingRecoverer#addHeadersFunction(BiFunction)
*/
public static class SingleRecordHeader extends RecordHeader {

/**
* Construct an instance.
* @param key the key.
* @param value the value.
*/
public SingleRecordHeader(String key, byte[] value) {
super(key, value);
}

/**
* Construct an instance.
* @param keyBuffer the key buffer.
* @param valueBuffer the value buffer.
*/
public SingleRecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
super(keyBuffer, valueBuffer);
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,6 +68,7 @@
import org.springframework.kafka.core.KafkaOperations.OperationsCallback;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.SingleRecordHeader;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.converter.ConversionException;
Expand Down Expand Up @@ -878,6 +879,33 @@ void immutableHeaders() {
assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(12);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void replaceNotAppendHeader() {
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
CompletableFuture future = mock(CompletableFuture.class);
given(template.send(any(ProducerRecord.class))).willReturn(future);
Headers headers = new RecordHeaders().add(new RecordHeader("foo", "orig".getBytes()));
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE,
-1, -1, null, "bar", headers, Optional.empty());
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setHeadersFunction((rec, ex) -> {
RecordHeaders toReplace = new RecordHeaders(
new RecordHeader[] { new SingleRecordHeader("foo", "one".getBytes()) });
return toReplace;
});
recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException()));
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
verify(template).send(producerRecordCaptor.capture());
ProducerRecord outRecord = producerRecordCaptor.getValue();
Headers outHeaders = outRecord.headers();
assertThat(KafkaTestUtils.getPropertyValue(outHeaders, "headers", List.class)).hasSize(11);
Iterator<Header> iterator = outHeaders.headers("foo").iterator();
assertThat(iterator.hasNext()).isTrue();
assertThat(iterator.next().value()).isEqualTo("one".getBytes());
assertThat(iterator.hasNext()).isFalse();
}

@SuppressWarnings("unchecked")
@Test
void nonCompliantProducerFactory() throws Exception {
Expand Down