diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 57fbef6f07..1962bb0f96 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -621,6 +621,9 @@ protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a `headersFunction` to the factory - `factory.setHeadersFunction((rec, ex) -> { ... })` +By default, any headers added will be cumulative - Kafka headers can contain multiple values. +Starting with version 2.9.5, if the `Headers` returned by the function contains a header of type `DeadLetterPublishingRecoverer.SingleRecordHeader`, then any existing values for that header will be removed and only the new single value will remain. + [[retry-topic-combine-blocking]] ==== Combining Blocking and Non-Blocking Retries diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java index 1488c5ee84..52633a7694 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,7 +40,9 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.springframework.core.log.LogAccessor; @@ -224,7 +226,9 @@ public void setRetainExceptionHeader(boolean retainExceptionHeader) { /** * Set a function which will be called to obtain additional headers to add to the - * published record. + * published record. If a {@link Header} returned is an instance of + * {@link SingleRecordHeader}, then that header will replace any existing header of + * that name, rather than being appended as a new value. * @param headersFunction the headers function. * @since 2.5.4 * @see #addHeadersFunction(BiFunction) @@ -411,7 +415,10 @@ public void includeHeader(HeaderNames.HeadersToAdd... headers) { /** * Add a function which will be called to obtain additional headers to add to the * published record. Functions are called in the order that they are added, and after - * any function passed into {@link #setHeadersFunction(BiFunction)}. + * any function passed into {@link #setHeadersFunction(BiFunction)}. If a + * {@link Header} returned is an instance of {@link SingleRecordHeader}, then that + * header will replace any existing header of that name, rather than being appended as + * a new value. * @param headersFunction the headers function. * @since 2.8.4 * @see #setHeadersFunction(BiFunction) @@ -707,7 +714,12 @@ private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord record, E maybeAddOriginalHeaders(kafkaHeaders, record, exception); Headers headers = this.headersFunction.apply(record, exception); if (headers != null) { - headers.forEach(kafkaHeaders::add); + headers.forEach(header -> { + if (header instanceof SingleRecordHeader) { + kafkaHeaders.remove(header.key()); + } + kafkaHeaders.add(header); + }); } } @@ -1374,4 +1386,34 @@ public interface ExceptionHeadersCreator { } + /** + * A {@link Header} that indicates that this header should replace any existing headers + * with this name, rather than being appended to the headers, which is the normal behavior. + * + * @since 2.9.5 + * @see DeadLetterPublishingRecoverer#setHeadersFunction(BiFunction) + * @see DeadLetterPublishingRecoverer#addHeadersFunction(BiFunction) + */ + public static class SingleRecordHeader extends RecordHeader { + + /** + * Construct an instance. + * @param key the key. + * @param value the value. + */ + public SingleRecordHeader(String key, byte[] value) { + super(key, value); + } + + /** + * Construct an instance. + * @param keyBuffer the key buffer. + * @param valueBuffer the value buffer. + */ + public SingleRecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) { + super(keyBuffer, valueBuffer); + } + + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java index 5cc4a4f513..05ff753b2b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,6 +68,7 @@ import org.springframework.kafka.core.KafkaOperations.OperationsCallback; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.SingleRecordHeader; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.converter.ConversionException; @@ -878,6 +879,33 @@ void immutableHeaders() { assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(12); } + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + void replaceNotAppendHeader() { + KafkaOperations template = mock(KafkaOperations.class); + CompletableFuture future = mock(CompletableFuture.class); + given(template.send(any(ProducerRecord.class))).willReturn(future); + Headers headers = new RecordHeaders().add(new RecordHeader("foo", "orig".getBytes())); + ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, + -1, -1, null, "bar", headers, Optional.empty()); + DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); + recoverer.setHeadersFunction((rec, ex) -> { + RecordHeaders toReplace = new RecordHeaders( + new RecordHeader[] { new SingleRecordHeader("foo", "one".getBytes()) }); + return toReplace; + }); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + ArgumentCaptor producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(template).send(producerRecordCaptor.capture()); + ProducerRecord outRecord = producerRecordCaptor.getValue(); + Headers outHeaders = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(outHeaders, "headers", List.class)).hasSize(11); + Iterator
iterator = outHeaders.headers("foo").iterator(); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next().value()).isEqualTo("one".getBytes()); + assertThat(iterator.hasNext()).isFalse(); + } + @SuppressWarnings("unchecked") @Test void nonCompliantProducerFactory() throws Exception {