Skip to content

Commit 549e243

Browse files
garyrussellartembilan
authored andcommitted
GH-2208: Fix Manual Nack with Mutating Interceptor
Resolves #2208 `equals()` test on `ConsumerRecord` fails (not implemented). Compare topic, partition and offset instead. **cherry-pick to 2.9.x, 2.8.x, 2.7.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
1 parent afc689a commit 549e243

File tree

3 files changed

+23
-2
lines changed

3 files changed

+23
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2201,14 +2201,21 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
22012201
Iterator<ConsumerRecord<K, V>> iterator2 = records.iterator();
22022202
while (iterator2.hasNext()) {
22032203
ConsumerRecord<K, V> next = iterator2.next();
2204-
if (next.equals(record) || list.size() > 0) {
2204+
if (list.size() > 0 || recordsEqual(record, next)) {
22052205
list.add(next);
22062206
}
22072207
}
22082208
SeekUtils.doSeeks(list, this.consumer, null, true, (rec, ex) -> false, this.logger); // NOSONAR
22092209
nackSleepAndReset();
22102210
}
22112211

2212+
2213+
private boolean recordsEqual(ConsumerRecord<K, V> rec1, ConsumerRecord<K, V> rec2) {
2214+
return rec1.topic().equals(rec2.topic())
2215+
&& rec1.partition() == rec2.partition()
2216+
&& rec1.offset() == rec2.offset();
2217+
}
2218+
22122219
private void nackSleepAndReset() {
22132220
try {
22142221
Thread.sleep(this.nackSleep);

spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ public interface RecordInterceptor<K, V> {
3737

3838
/**
3939
* Perform some action on the record or return a different one. If null is returned
40-
* the record will be skipped. Invoked before the listener.
40+
* the record will be skipped. Invoked before the listener. IMPORTANT; if this method
41+
* returns a different record, the topic, partition and offset must not be changed
42+
* to avoid undesirable side-effects.
4143
* @param record the record.
4244
* @return the record or null.
4345
* @deprecated in favor of {@link #intercept(ConsumerRecord, Consumer)} which will

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.springframework.kafka.listener.ContainerProperties.AckMode;
5959
import org.springframework.kafka.support.Acknowledgment;
6060
import org.springframework.kafka.test.utils.KafkaTestUtils;
61+
import org.springframework.lang.Nullable;
6162
import org.springframework.test.annotation.DirtiesContext;
6263
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
6364

@@ -212,6 +213,17 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
212213
factory.setErrorHandler(new SeekToCurrentErrorHandler());
213214
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
214215
factory.getContainerProperties().setMissingTopicsFatal(false);
216+
factory.setRecordInterceptor(new RecordInterceptor() {
217+
218+
@Override
219+
@Nullable
220+
public ConsumerRecord intercept(ConsumerRecord record, Consumer consumer) {
221+
return new ConsumerRecord(record.topic(), record.partition(), record.offset(), 0L,
222+
TimestampType.NO_TIMESTAMP_TYPE, 0, 0, record.key(), record.value(), record.headers(),
223+
Optional.empty());
224+
}
225+
226+
});
215227
return factory;
216228
}
217229

0 commit comments

Comments
 (0)