Skip to content

Commit e08854a

Browse files
garyrussellartembilan
authored andcommitted
GH-566: Ack Concurrency Issue
Fixes #566 If `Acknowledgment.acknowledge()` is called on a foreign thread, there is a concurrency problem with the `offsets` field; the consumer thread might `clear()` unprocessed acks. Furthermore, `AckMode.MANUAL_IMMEDIATE` cannot use the `Consumer` object if the ack is called on a foreign thread - the `Consumer` is not thread-safe. - Revert `offsets` to simple `HashMap`s - Only reference `offsets` on the consumer thread - enqueue foreign acks into the `acks` queue (even "immediate" acks) **Cherry pick to 2.0.x (and 1.3.x, fixing the lambda in the test and the diamond operators).** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
1 parent b1649d2 commit e08854a

File tree

2 files changed

+91
-6
lines changed

2 files changed

+91
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.kafka.common.errors.WakeupException;
4949

5050
import org.springframework.core.task.SimpleAsyncTaskExecutor;
51+
import org.springframework.kafka.KafkaException;
5152
import org.springframework.kafka.core.ConsumerFactory;
5253
import org.springframework.kafka.core.KafkaResourceHolder;
5354
import org.springframework.kafka.core.ProducerFactoryUtils;
@@ -346,6 +347,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
346347

347348
private volatile Collection<TopicPartition> assignedPartitions;
348349

350+
private volatile Thread consumerThread;
351+
349352
private int count;
350353

351354
private long last = System.currentTimeMillis();
@@ -595,6 +598,7 @@ public boolean isLongLived() {
595598

596599
@Override
597600
public void run() {
601+
this.consumerThread = Thread.currentThread();
598602
if (this.genericListener instanceof ConsumerSeekAware) {
599603
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
600604
}
@@ -705,16 +709,27 @@ record = this.acks.poll();
705709
}
706710

707711
private void processAck(ConsumerRecord<K, V> record) {
708-
if (this.isManualImmediateAck) {
712+
if (!Thread.currentThread().equals(this.consumerThread)) {
709713
try {
710-
ackImmediate(record);
714+
this.acks.put(record);
711715
}
712-
catch (WakeupException e) {
713-
// ignore - not polling
716+
catch (InterruptedException e) {
717+
Thread.currentThread().interrupt();
718+
throw new KafkaException("Interrupted while storing ack", e);
714719
}
715720
}
716721
else {
717-
addOffset(record);
722+
if (this.isManualImmediateAck) {
723+
try {
724+
ackImmediate(record);
725+
}
726+
catch (WakeupException e) {
727+
// ignore - not polling
728+
}
729+
}
730+
else {
731+
addOffset(record);
732+
}
718733
}
719734
}
720735

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,77 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
535535
container.stop();
536536
}
537537

538+
@Test
539+
public void testRecordAckMockForeignThread() throws Exception {
540+
testRecordAckMockForeignThreadGuts(AckMode.MANUAL);
541+
}
542+
543+
@Test
544+
public void testRecordAckMockForeignThreadImmediate() throws Exception {
545+
testRecordAckMockForeignThreadGuts(AckMode.MANUAL_IMMEDIATE);
546+
}
547+
548+
@SuppressWarnings("unchecked")
549+
private void testRecordAckMockForeignThreadGuts(AckMode ackMode) throws Exception {
550+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
551+
Consumer<Integer, String> consumer = mock(Consumer.class);
552+
given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer);
553+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
554+
records.put(new TopicPartition("foo", 0), Arrays.asList(
555+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
556+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
557+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
558+
given(consumer.poll(anyLong())).willAnswer(i -> {
559+
Thread.sleep(50);
560+
return consumerRecords;
561+
});
562+
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
563+
new TopicPartitionInitialOffset("foo", 0) };
564+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
565+
containerProps.setAckMode(ackMode);
566+
final CountDownLatch latch = new CountDownLatch(2);
567+
final List<Acknowledgment> acks = new ArrayList<>();
568+
final AtomicReference<Thread> consumerThread = new AtomicReference<>();
569+
AcknowledgingMessageListener<Integer, String> messageListener = spy(
570+
new AcknowledgingMessageListener<Integer, String>() {
571+
572+
@Override
573+
public void onMessage(ConsumerRecord<Integer, String> data, Acknowledgment acknowledgment) {
574+
acks.add(acknowledgment);
575+
consumerThread.set(Thread.currentThread());
576+
latch.countDown();
577+
if (latch.getCount() == 0) {
578+
records.clear();
579+
}
580+
}
581+
582+
});
583+
584+
final CountDownLatch commitLatch = new CountDownLatch(1);
585+
final AtomicReference<Thread> commitThread = new AtomicReference<>();
586+
willAnswer(i -> {
587+
commitThread.set(Thread.currentThread());
588+
commitLatch.countDown();
589+
return null;
590+
}
591+
).given(consumer).commitSync(any(Map.class));
592+
593+
containerProps.setMessageListener(messageListener);
594+
containerProps.setClientId("clientId");
595+
KafkaMessageListenerContainer<Integer, String> container =
596+
new KafkaMessageListenerContainer<>(cf, containerProps);
597+
container.start();
598+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
599+
acks.get(1).acknowledge();
600+
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
601+
InOrder inOrder = inOrder(messageListener, consumer);
602+
inOrder.verify(consumer).poll(1000);
603+
inOrder.verify(messageListener, times(2)).onMessage(any(ConsumerRecord.class), any(Acknowledgment.class));
604+
inOrder.verify(consumer).commitSync(any(Map.class));
605+
container.stop();
606+
assertThat(commitThread.get()).isSameAs(consumerThread.get());
607+
}
608+
538609
@SuppressWarnings("unchecked")
539610
@Test
540611
public void testBrokerDownEvent() throws Exception {
@@ -1484,7 +1555,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
14841555
final CountDownLatch commitLatch = new CountDownLatch(2);
14851556
willAnswer(invocation -> {
14861557

1487-
@SuppressWarnings({ "unchecked" })
14881558
Map<TopicPartition, OffsetAndMetadata> map = invocation.getArgument(0);
14891559
try {
14901560
return invocation.callRealMethod();

0 commit comments

Comments
 (0)