From 5e09723baf0b1ec5a770d7cab2fcb01044e767fa Mon Sep 17 00:00:00 2001 From: Christian Fredriksson Date: Wed, 10 Sep 2025 21:23:10 +0200 Subject: [PATCH] Allow configuring CompositeBatchInterceptor This changes allows configuring CompositeBatchInterceptor on AbstractMessageListenerContainer, in same way as CompositeRecordInterceptor. Signed-off-by: Christian Fredriksson --- .../receiving-messages/message-listener-container.adoc | 4 ++-- .../listener/AbstractMessageListenerContainer.java | 8 +++++++- .../kafka/listener/CompositeBatchInterceptor.java | 10 ++++++++++ 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc index 3461b74c59..2cf7287375 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc @@ -22,8 +22,8 @@ IMPORTANT: If the interceptor mutates the record (by creating a new one), the `t The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors. -Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` as a public method. -If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` has already been configured. +Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` and `getBatchInterceptor()` as public methods. +If the returned interceptor is an instance of `CompositeRecordInterceptor` or `CompositeBatchInterceptor`, additional `RecordInterceptor` or `BatchInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` or `BatchInterceptor` has already been configured. The following example shows how to do so: [source, java] diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index b42e69fc42..66bf4bc67b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -72,6 +72,7 @@ * @author Soby Chacko * @author Sanghyeok An * @author Lokesh Alamuri + * @author Christian Fredriksson */ public abstract class AbstractMessageListenerContainer implements GenericMessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware, @@ -480,7 +481,12 @@ public void setRecordInterceptor(@Nullable RecordInterceptor recordInterce this.recordInterceptor = recordInterceptor; } - protected @Nullable BatchInterceptor getBatchInterceptor() { + /** + * Get the {@link BatchInterceptor} for modification, if configured. + * @return the {@link BatchInterceptor}, or {@code null} if not configured + * @since 4.0 + */ + public @Nullable BatchInterceptor getBatchInterceptor() { return this.batchInterceptor; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java index eb3058469a..909813b351 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java @@ -34,6 +34,7 @@ * @param the value type. * * @author Gary Russell + * @author Christian Fredriksson * @since 2.7 * */ @@ -85,4 +86,13 @@ public void clearThreadState(Consumer consumer) { this.delegates.forEach(del -> del.clearThreadState(consumer)); } + /** + * Add an {@link BatchInterceptor} to delegates. + * @param batchInterceptor the interceptor. + * @since 4.0 + */ + public void addBatchInterceptor(BatchInterceptor batchInterceptor) { + this.delegates.add(batchInterceptor); + } + }