Skip to content

Commit c56365e

Browse files
garyrussellartembilan
authored andcommitted
GH-623: Fix AckMode.COUNT
Fixes #623 Grab ack count before moving acks to offsets. # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
1 parent 7105774 commit c56365e

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1051,8 +1051,8 @@ private void sendOffsetsToTransaction(Producer producer) {
10511051
}
10521052

10531053
private void processCommits() {
1054-
handleAcks();
10551054
this.count += this.acks.size();
1055+
handleAcks();
10561056
long now;
10571057
AckMode ackMode = this.containerProperties.getAckMode();
10581058
if (!this.isManualImmediateAck) {

spring-kafka/src/main/java/org/springframework/kafka/support/SendResult.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,9 @@ public RecordMetadata getRecordMetadata() {
4747
return this.recordMetadata;
4848
}
4949

50+
@Override
51+
public String toString() {
52+
return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";
53+
}
54+
5055
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1607,6 +1607,71 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
16071607
logger.info("Stop rebalance after failed record");
16081608
}
16091609

1610+
@SuppressWarnings({ "unchecked", "rawtypes" })
1611+
@Test
1612+
public void testAckModeCount() throws Exception {
1613+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
1614+
Consumer<Integer, String> consumer = mock(Consumer.class);
1615+
given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer);
1616+
TopicPartition topicPartition = new TopicPartition("foo", 0);
1617+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records1 = new HashMap<>();
1618+
records1.put(topicPartition, Arrays.asList(
1619+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
1620+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
1621+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records2 = new HashMap<>();
1622+
records2.put(topicPartition, Arrays.asList(
1623+
new ConsumerRecord<>("foo", 0, 2L, 1, "baz"),
1624+
new ConsumerRecord<>("foo", 0, 3L, 1, "qux"))); // commit (4 >= 3)
1625+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records3 = new HashMap<>();
1626+
records3.put(topicPartition, Arrays.asList(
1627+
new ConsumerRecord<>("foo", 0, 4L, 1, "fiz"),
1628+
new ConsumerRecord<>("foo", 0, 5L, 1, "buz"),
1629+
new ConsumerRecord<>("foo", 0, 6L, 1, "bif"))); // commit (3 >= 3)
1630+
ConsumerRecords<Integer, String> consumerRecords1 = new ConsumerRecords<>(records1);
1631+
ConsumerRecords<Integer, String> consumerRecords2 = new ConsumerRecords<>(records2);
1632+
ConsumerRecords<Integer, String> consumerRecords3 = new ConsumerRecords<>(records3);
1633+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
1634+
AtomicInteger which = new AtomicInteger();
1635+
given(consumer.poll(anyLong())).willAnswer(i -> {
1636+
Thread.sleep(50);
1637+
int recordsToUse = which.incrementAndGet();
1638+
switch (recordsToUse) {
1639+
case 1:
1640+
return consumerRecords1;
1641+
case 2:
1642+
return consumerRecords2;
1643+
case 3:
1644+
return consumerRecords3;
1645+
default:
1646+
return emptyRecords;
1647+
}
1648+
});
1649+
final CountDownLatch commitLatch = new CountDownLatch(2);
1650+
willAnswer(i -> {
1651+
commitLatch.countDown();
1652+
return null;
1653+
}).given(consumer).commitSync(any(Map.class));
1654+
given(consumer.assignment()).willReturn(records1.keySet());
1655+
TopicPartitionInitialOffset[] topicPartitionOffset = new TopicPartitionInitialOffset[] {
1656+
new TopicPartitionInitialOffset("foo", 0) };
1657+
ContainerProperties containerProps = new ContainerProperties(topicPartitionOffset);
1658+
containerProps.setAckMode(AckMode.COUNT);
1659+
containerProps.setAckCount(3);
1660+
containerProps.setClientId("clientId");
1661+
AtomicInteger recordCount = new AtomicInteger();
1662+
containerProps.setMessageListener((MessageListener) r -> {
1663+
recordCount.incrementAndGet();
1664+
});
1665+
KafkaMessageListenerContainer<Integer, String> container =
1666+
new KafkaMessageListenerContainer<>(cf, containerProps);
1667+
container.start();
1668+
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
1669+
assertThat(recordCount.get()).isEqualTo(7);
1670+
verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(4L)));
1671+
verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(7L)));
1672+
container.stop();
1673+
}
1674+
16101675
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
16111676
Consumer<?, ?> consumer = spy(
16121677
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));

0 commit comments

Comments
 (0)