Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.GenericErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
Expand Down Expand Up @@ -98,6 +99,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe

private Boolean missingTopicsFatal;

private RecordInterceptor<K, V> recordInterceptor;

/**
* Specify a {@link ConsumerFactory} to use.
* @param consumerFactory The consumer factory.
Expand Down Expand Up @@ -280,6 +283,16 @@ public ContainerProperties getContainerProperties() {
return this.containerProperties;
}

/**
* Set an interceptor to be called before calling the listener.
* Does not apply to batch listeners.
* @param recordInterceptor the interceptor.
* @since 2.2.7
*/
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}

@Override
public void afterPropertiesSet() {
if (this.errorHandler != null) {
Expand Down Expand Up @@ -356,6 +369,7 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
else if (this.autoStartup != null) {
instance.setAutoStartup(this.autoStartup);
}
instance.setRecordInterceptor(this.recordInterceptor);
JavaUtils.INSTANCE
.acceptIfNotNull(this.phase, instance::setPhase)
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public abstract class AbstractMessageListenerContainer<K, V>

private int topicCheckTimeout = DEFAULT_TOPIC_CHECK_TIMEOUT;

private RecordInterceptor<K, V> recordInterceptor;

private volatile boolean running = false;

private volatile boolean paused;
Expand Down Expand Up @@ -279,6 +281,20 @@ public void setTopicCheckTimeout(int topicCheckTimeout) {
this.topicCheckTimeout = topicCheckTimeout;
}

protected RecordInterceptor<K, V> getRecordInterceptor() {
return this.recordInterceptor;
}

/**
* Set an interceptor to be called before calling the listener.
* Does not apply to batch listeners.
* @param recordInterceptor the interceptor.
* @since 2.2.7
*/
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}

@Override
public void setupMessageListener(Object messageListener) {
this.containerProperties.setMessageListener(messageListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ protected void doStart() {
container.setClientIdSuffix("-" + i);
container.setGenericErrorHandler(getGenericErrorHandler());
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
container.setRecordInterceptor(getRecordInterceptor());
container.setEmergencyStop(() -> {
stop(() -> {
// NOSONAR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final Duration syncCommitTimeout;

private final RecordInterceptor<K, V> recordInterceptor = getRecordInterceptor();

private Map<TopicPartition, OffsetMetadata> definedPartitions;

private volatile Collection<TopicPartition> assignedPartitions;
Expand Down Expand Up @@ -1308,26 +1310,35 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record,
ackCurrent(record, producer);
}

private void doInvokeOnMessage(final ConsumerRecord<K, V> record) {
switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null, this.consumer);
break;
case CONSUMER_AWARE:
this.listener.onMessage(record, this.consumer);
break;
case ACKNOWLEDGING:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null);
break;
case SIMPLE:
this.listener.onMessage(record);
break;
private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
ConsumerRecord<K, V> record = recordArg;
if (this.recordInterceptor != null) {
record = this.recordInterceptor.intercept(record);
}
if (record == null) {
this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + recordArg);
}
else {
switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null, this.consumer);
break;
case CONSUMER_AWARE:
this.listener.onMessage(record, this.consumer);
break;
case ACKNOWLEDGING:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null);
break;
case SIMPLE:
this.listener.onMessage(record);
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.lang.Nullable;

/**
* An interceptor for {@link ConsumerRecord} invoked by the listener
* container before invoking the listener.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
* @since 2.2.7
*
*/
@FunctionalInterface
public interface RecordInterceptor<K, V> {

/**
* Perform some action on the record or return a different one.
* If null is returned the record will be skipped.
* @param record the record.
* @return the record or null.
*/
@Nullable
ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);

}
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ public void testKeyConversion() throws Exception {
this.bytesKeyTemplate.send("annotated36", "foo".getBytes(), "bar");
assertThat(this.listener.keyLatch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.convertedKey).isEqualTo("foo");
assertThat(this.config.intercepted).isTrue();
}

@Test
Expand All @@ -761,7 +762,11 @@ public void testProjection() throws InterruptedException {
@EnableTransactionManagement(proxyTargetClass = true)
public static class Config implements KafkaListenerConfigurer {

private final CountDownLatch spyLatch = new CountDownLatch(2);
final CountDownLatch spyLatch = new CountDownLatch(2);

volatile Throwable globalErrorThrowable;

volatile boolean intercepted;

@Bean
public static PropertySourcesPlaceholderConfigurer ppc() {
Expand All @@ -784,8 +789,6 @@ public ChainedKafkaTransactionManager<Integer, String> cktm() {
return new ChainedKafkaTransactionManager<>(ktm(), transactionManager());
}

private Throwable globalErrorThrowable;

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
Expand Down Expand Up @@ -884,6 +887,10 @@ public KafkaListenerContainerFactory<?> bytesStringListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<byte[], String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(bytesStringConsumerFactory());
factory.setRecordInterceptor(record -> {
this.intercepted = true;
return record;
});
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ protected KafkaConsumer<Integer, String> createKafkaConsumer(String groupId, Str
ContainerProperties containerProps = new ContainerProperties(topic1);
containerProps.setLogContainerConfig(true);

final CountDownLatch latch = new CountDownLatch(4);
final CountDownLatch latch = new CountDownLatch(3);
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
final List<String> payloads = new ArrayList<>();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
payloads.add(message.value());
latch.countDown();
});

Expand All @@ -144,6 +146,11 @@ protected KafkaConsumer<Integer, String> createKafkaConsumer(String groupId, Str
stopLatch.countDown();
}
});
CountDownLatch intercepted = new CountDownLatch(4);
container.setRecordInterceptor(record -> {
intercepted.countDown();
return record.value().equals("baz") ? null : record;
});
container.start();

ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Expand All @@ -158,6 +165,7 @@ protected KafkaConsumer<Integer, String> createKafkaConsumer(String groupId, Str
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertThat(intercepted.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
for (String threadName : listenerThreadNames) {
assertThat(threadName).contains("-C-");
Expand All @@ -173,6 +181,7 @@ protected KafkaConsumer<Integer, String> createKafkaConsumer(String groupId, Str
Set<KafkaMessageListenerContainer<Integer, String>> children = new HashSet<>(containers);
container.stop();
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(payloads).containsExactlyInAnyOrder("foo", "bar", "qux");
events.forEach(e -> {
assertThat(e.getContainer(MessageListenerContainer.class)).isSameAs(container);
if (e instanceof ContainerStoppedEvent) {
Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,10 @@ Two `MessageListenerContainer` implementations are provided:
The `KafkaMessageListenerContainer` receives all message from all topics or partitions on a single thread.
The `ConcurrentMessageListenerContainer` delegates to one or more `KafkaMessageListenerContainer` instances to provide multi-threaded consumption.

Starting with version 2.1.7, you can add a `RecordInterceptor` to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.
If the interceptor returns null, the listener is not called.
The interceptor is not invoked when the listener is a <<batch-listners, batch listener>>.

[[kafka-container]]
====== Using `KafkaMessageListenerContainer`

Expand Down
3 changes: 3 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ See <<seek-to-current>> for more information.
It is now possible to obtain the consumer's `group.id` property in the listener method.
See <<listener-group-id>> for more information.

The container has a new property `recordInterceptor` allowing records to be inspected or modified before invoking the listener.
See <<message-listener-container>> for more information.

==== ErrorHandler Changes

The `SeekToCurrentErrorHandler` now treats certain exceptions as fatal and disables retry for those, invoking the recoverer on first failure.
Expand Down