Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.adapter.DeDuplicationStrategy;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.converter.MessageConverter;

Expand Down Expand Up @@ -51,7 +51,7 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe

private MessageConverter messageConverter;

private DeDuplicationStrategy<K, V> deDuplicationStrategy;
private RecordFilterStrategy<K, V> recordFilterStrategy;

private ApplicationEventPublisher applicationEventPublisher;

Expand Down Expand Up @@ -95,10 +95,10 @@ public void setMessageConverter(MessageConverter messageConverter) {

/**
* Set the de-duplication strategy.
* @param deDuplicationStrategy the strategy.
* @param recordFilterStrategy the strategy.
*/
public void setDeDuplicationStrategy(DeDuplicationStrategy<K, V> deDuplicationStrategy) {
this.deDuplicationStrategy = deDuplicationStrategy;
public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
this.recordFilterStrategy = recordFilterStrategy;
}

@Override
Expand Down Expand Up @@ -133,8 +133,8 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
instance.setBeanName(endpoint.getId());
}

if (this.deDuplicationStrategy != null && endpoint instanceof AbstractKafkaListenerEndpoint) {
((AbstractKafkaListenerEndpoint<K, V>) endpoint).setDeDuplicationStrategy(this.deDuplicationStrategy);
if (this.recordFilterStrategy != null && endpoint instanceof AbstractKafkaListenerEndpoint) {
((AbstractKafkaListenerEndpoint<K, V>) endpoint).setRecordFilterStrategy(this.recordFilterStrategy);
}

endpoint.setupListenerContainer(instance, this.messageConverter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.DeDuplicationStrategy;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -68,7 +68,7 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>

private String group;

private DeDuplicationStrategy<K, V> deDuplicationStrategy;
private RecordFilterStrategy<K, V> recordFilterStrategy;


@Override
Expand Down Expand Up @@ -196,16 +196,16 @@ public void afterPropertiesSet() {
}
}

protected DeDuplicationStrategy<K, V> getDeDuplicationStrategy() {
return this.deDuplicationStrategy;
protected RecordFilterStrategy<K, V> getRecordFilterStrategy() {
return this.recordFilterStrategy;
}

/**
* Set a {@link DeDuplicationStrategy} implementation.
* @param deDuplicationStrategy the strategy implementation.
* Set a {@link RecordFilterStrategy} implementation.
* @param recordFilterStrategy the strategy implementation.
*/
public void setDeDuplicationStrategy(DeDuplicationStrategy<K, V> deDuplicationStrategy) {
this.deDuplicationStrategy = deDuplicationStrategy;
public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
this.recordFilterStrategy = recordFilterStrategy;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageLis
Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set");
MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance();
if (getDeDuplicationStrategy() != null) {
messageListener.setDeDuplicationStrategy(getDeDuplicationStrategy());
if (getRecordFilterStrategy() != null) {
messageListener.setRecordFilterStrategy(getRecordFilterStrategy());
}
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
if (messageConverter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,26 @@
import org.springframework.util.Assert;

/**
* An abstract message listener adapter that implements de-duplication logic
* via a {@link DeDuplicationStrategy}.
* An abstract message listener adapter that implements record filter logic
* via a {@link RecordFilterStrategy}.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public abstract class AbstractDeDuplicatingMessageListener<K, V> {
public abstract class AbstractFilteringMessageListener<K, V> {

private final DeDuplicationStrategy<K, V> deDupStrategy;
private final RecordFilterStrategy<K, V> recordFilterStrategy;

protected AbstractDeDuplicatingMessageListener(DeDuplicationStrategy<K, V> deDupStrategy) {
Assert.notNull(deDupStrategy, "'deDupStrategy' cannot be null");
this.deDupStrategy = deDupStrategy;
protected AbstractFilteringMessageListener(RecordFilterStrategy<K, V> recordFilterStrategy) {
Assert.notNull(recordFilterStrategy, "'recordFilterStrategy' cannot be null");
this.recordFilterStrategy = recordFilterStrategy;
}

protected boolean isDuplicate(ConsumerRecord<K, V> consumerRecord) {
return this.deDupStrategy.isDuplicate(consumerRecord);
protected boolean filter(ConsumerRecord<K, V> consumerRecord) {
return this.recordFilterStrategy.filter(consumerRecord);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener.adapter;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;

/**
* A {@link AcknowledgingMessageListener} adapter that implements filter logic
* via a {@link RecordFilterStrategy}.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public class FilteringAcknowledgingMessageListenerAdapter<K, V> extends AbstractFilteringMessageListener<K, V>
implements AcknowledgingMessageListener<K, V> {

private final AcknowledgingMessageListener<K, V> delegate;

private final boolean ackDiscarded;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we add this option into the MessagingMessageListenerAdapter as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I was hoping to avoid that 😄

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to imagine a use case where not acknowledging a discarded message would be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They might be acking manually on some condition (special marker record perhaps and not want random acks being done on their behalf.

I guess I am just a bit uncomfortable issuing automatic acks when they have specifically requested to use manual acks.

Copy link

@mbogoevici mbogoevici Jun 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. An option gives a choice then. Should we at least make this true by default - otherwise, in the case of manual acks, they would almost never have a chance to ack those messages and there's no chance that they would ack them in the future on replay - so unless there's a message that they're interested in, this would keep redelivering messages on each restart.


/**
* Create an instance with the supplied strategy and delegate listener.
* @param recordFilterStrategy the filter.
* @param delegate the delegate.
* @param ackDiscarded true to ack (commit offset for) discarded messages.
*/
public FilteringAcknowledgingMessageListenerAdapter(RecordFilterStrategy<K, V> recordFilterStrategy,
AcknowledgingMessageListener<K, V> delegate, boolean ackDiscarded) {
super(recordFilterStrategy);
this.delegate = delegate;
this.ackDiscarded = ackDiscarded;
}

@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment) {
if (!filter(consumerRecord)) {
this.delegate.onMessage(consumerRecord, acknowledgment);
}
else {
if (this.ackDiscarded) {
acknowledgment.acknowledge();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,34 @@
import org.springframework.kafka.listener.MessageListener;

/**
* A {@link MessageListener} adapter that implements de-duplication logic
* via a DeDuplicationStrategy.
* A {@link MessageListener} adapter that implements filter logic
* via a {@link RecordFilterStrategy}.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public class DeDuplicatingMessageListenerAdapter<K, V> extends AbstractDeDuplicatingMessageListener<K, V>
public class FilteringMessageListenerAdapter<K, V> extends AbstractFilteringMessageListener<K, V>
implements MessageListener<K, V> {

private final MessageListener<K, V> delegate;

public DeDuplicatingMessageListenerAdapter(DeDuplicationStrategy<K, V> deDupStrategy,
/**
* Create an instance with the supplied strategy and delegate listener.
* @param recordFilterStrategy the filter.
* @param delegate the delegate.
*/
public FilteringMessageListenerAdapter(RecordFilterStrategy<K, V> recordFilterStrategy,
MessageListener<K, V> delegate) {
super(deDupStrategy);
super(recordFilterStrategy);
this.delegate = delegate;
}

@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord) {
if (!isDuplicate(consumerRecord)) {
if (!filter(consumerRecord)) {
this.delegate.onMessage(consumerRecord);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class MessagingMessageListenerAdapter<K, V> extends AbstractAdaptableMess

private MessageConverter messageConverter = new MessagingMessageConverter();

private DeDuplicationStrategy<K, V> deDuplicationStrategy;
private RecordFilterStrategy<K, V> recordFilterStrategy;


public MessagingMessageListenerAdapter(Method method) {
Expand Down Expand Up @@ -103,16 +103,16 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
invokeHandler(record, acknowledgment, message);
}

protected DeDuplicationStrategy<K, V> getDeDuplicationStrategy() {
return this.deDuplicationStrategy;
protected RecordFilterStrategy<K, V> getRecordFilterStrategy() {
return this.recordFilterStrategy;
}

/**
* Set a {@link DeDuplicationStrategy} implementation.
* @param deDuplicationStrategy the strategy implementation.
* Set a {@link RecordFilterStrategy} implementation.
* @param recordFilterStrategy the strategy implementation.
*/
public void setDeDuplicationStrategy(DeDuplicationStrategy<K, V> deDuplicationStrategy) {
this.deDuplicationStrategy = deDuplicationStrategy;
public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
this.recordFilterStrategy = recordFilterStrategy;
}

protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment) {
Expand All @@ -128,7 +128,7 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, Acknowledgm
* @return the result of invocation.
*/
private Object invokeHandler(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Message<?> message) {
if (this.deDuplicationStrategy != null && this.deDuplicationStrategy.isDuplicate(record)) {
if (this.recordFilterStrategy != null && this.recordFilterStrategy.filter(record)) {
return null;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* Implementations of this interface can signal that a message about
* to be delivered to a message listener is a duplicate.
* Implementations of this interface can signal that a record about
* to be delivered to a message listener should be discarded instead
* of being delivered.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public interface DeDuplicationStrategy<K, V> {
public interface RecordFilterStrategy<K, V> {

/**
* Return true if the record is a duplicate and should be discarded.
* Return true if the record should be discarded.
* @param consumerRecord the record.
* @return true to discard.
*/
boolean isDuplicate(ConsumerRecord<K, V> consumerRecord);
boolean filter(ConsumerRecord<K, V> consumerRecord);

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.DeDuplicationStrategy;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
Expand Down Expand Up @@ -101,7 +101,7 @@ public class EnableKafkaIntegrationTests {
public KafkaListenerEndpointRegistry registry;

@Autowired
private DeDupImpl deDup;
private RecordFilterImpl recordFilter;

@Test
public void testSimple() throws Exception {
Expand Down Expand Up @@ -140,7 +140,7 @@ public void testSimple() throws Exception {
template.flush();
assertThat(this.listener.latch7.await(20, TimeUnit.SECONDS)).isTrue();

assertThat(this.deDup.called).isTrue();
assertThat(this.recordFilter.called).isTrue();
}

@Test
Expand Down Expand Up @@ -214,13 +214,13 @@ public PlatformTransactionManager transactionManager() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setDeDuplicationStrategy(deDup());
factory.setRecordFilterStrategy(filter());
return factory;
}

@Bean
public DeDupImpl deDup() {
return new DeDupImpl();
public RecordFilterImpl filter() {
return new RecordFilterImpl();
}

@Bean
Expand Down Expand Up @@ -501,12 +501,12 @@ public void setBar(String bar) {

}

public static class DeDupImpl implements DeDuplicationStrategy<Integer, String> {
public static class RecordFilterImpl implements RecordFilterStrategy<Integer, String> {

private boolean called;

@Override
public boolean isDuplicate(ConsumerRecord<Integer, String> consumerRecord) {
public boolean filter(ConsumerRecord<Integer, String> consumerRecord) {
called = true;
return false;
}
Expand Down
12 changes: 6 additions & 6 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public void listen(@Payload String foo,
}
----

===== Handling Duplicates
===== Filtering Messages

In certain scenarios, such as rebalancing, a message may be redelivered that has already been processed.
The framework cannot know whether such a message has been processed or not, that is an application-level
Expand All @@ -339,15 +339,15 @@ Receiver] pattern and Spring Integration provides an
http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#idempotent-receiver
[implementation thereof].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, but the divided link part brake it.
So the part in the [] should be moved to the same row as url.


The Spring for Apache Kafka project also provides some assistance by means of the `DeDuplicatingMessageListenerAdapter`
The Spring for Apache Kafka project also provides some assistance by means of the `FilteringMessageListenerAdapter`
classe, which can wrap your `MessageListener`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: classe

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The typo hasn't been fixed. Strange that my comment on the matter had gone.
NP. Will fix on merge.

This class takes an implementation of `DeDuplicationStrategy` where you implement the `isDuplicate` method to signal
This class takes an implementation of `RecordFilterStrategy` where you implement the `filter` method to signal
that a message is a duplicate and should be discarded.

Similarly, when using `@KafkaListener`, the `DeDuplicationStrategy` can be injected into the container factory.
Similarly, when using `@KafkaListener`, the `RecordFilterStrategy` can be injected into the container factory.

A wrapper is not provided for the `AcknowledgingMessageListener` because you might need to acknowledge the duplicated
message.
A `FilteringAcknowledgingMessageListenerAdapter` is also provided for wrapping an `AcknowledgingMessageListener`.
This has an additional property `ackDiscarded` which indicates whether the adapter should acknowledge the discarded record.

==== Serialization/Deserialization and Message Conversion

Expand Down