Skip to content

Commit 950a2ea

Browse files
garyrussellartembilan
authored andcommitted
GH-2410: Disallow nack() with Out of Order Commits
Resolves #2410 `nack()` cannot be used with out of order commits - the contract means commit all previous offsets and redeliver remaining. - the appliation might nack multiple records. **cherry-pick to 2.9.x, 2.8.x** # Conflicts: # spring-kafka-docs/src/main/asciidoc/kafka.adoc
1 parent 0547dc7 commit 950a2ea

File tree

3 files changed

+16
-12
lines changed

3 files changed

+16
-12
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1260,6 +1260,8 @@ NOTE: If you want to commit a partial batch, using `nack()`, When using transact
12601260

12611261
IMPORTANT: `nack()` can only be called on the consumer thread that invokes your listener.
12621262

1263+
IMPORTANT: `nack()` is not allowed when using <<ooo-commits, Out of Order Commits>>.
1264+
12631265
With a record listener, when `nack()` is called, any pending offsets are committed, the remaining records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next `poll()`.
12641266
The consumer can be paused before redelivery, by setting the `sleep` argument.
12651267
This is similar functionality to throwing an exception when the container is configured with a `DefaultErrorHandler`.

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3211,14 +3211,10 @@ public void acknowledge() {
32113211
public void nack(Duration sleep) {
32123212
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
32133213
"nack() can only be called on the consumer thread");
3214+
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
3215+
"nack() is not supported with out-of-order commits (asyncAcks=true)");
32143216
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
32153217
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
3216-
synchronized (ListenerConsumer.this) {
3217-
if (ListenerConsumer.this.offsetsInThisBatch != null) {
3218-
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
3219-
ListenerConsumer.this.deferredOffsets.forEach((part, recs) -> recs.clear());
3220-
}
3221-
}
32223218
}
32233219

32243220
@Override
@@ -3259,16 +3255,12 @@ public void acknowledge() {
32593255
public void nack(int index, Duration sleep) {
32603256
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
32613257
"nack() can only be called on the consumer thread");
3258+
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
3259+
"nack() is not supported with out-of-order commits (asyncAcks=true)");
32623260
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
32633261
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
32643262
ListenerConsumer.this.nackIndex = index;
32653263
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
3266-
synchronized (ListenerConsumer.this) {
3267-
if (ListenerConsumer.this.offsetsInThisBatch != null) {
3268-
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
3269-
ListenerConsumer.this.deferredOffsets.forEach((part, recs) -> recs.clear());
3270-
}
3271-
}
32723264
int i = 0;
32733265
List<ConsumerRecord<K, V>> toAck = new LinkedList<>();
32743266
for (ConsumerRecord<K, V> record : this.records) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,16 @@ private void testInOrderAck(AckMode ackMode) throws Exception {
689689
containerProps.setAsyncAcks(true);
690690
final CountDownLatch latch = new CountDownLatch(4);
691691
final List<Acknowledgment> acks = new ArrayList<>();
692+
final AtomicReference<IllegalStateException> illegal = new AtomicReference<>();
692693
AcknowledgingMessageListener<Integer, String> messageListener = (data, ack) -> {
694+
if (latch.getCount() == 4) {
695+
try {
696+
ack.nack(Duration.ofSeconds(1));
697+
}
698+
catch (IllegalStateException ex) {
699+
illegal.set(ex);
700+
}
701+
}
693702
latch.countDown();
694703
acks.add(ack);
695704
if (latch.getCount() == 0) {
@@ -720,6 +729,7 @@ private void testInOrderAck(AckMode ackMode) throws Exception {
720729
verify(consumer).commitSync(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(4L)),
721730
Duration.ofMinutes(1));
722731
container.stop();
732+
assertThat(illegal.get()).isNotNull();
723733
}
724734

725735
@Test

0 commit comments

Comments
 (0)