Skip to content

Commit 114f409

Browse files
garyrussellartembilan
authored andcommitted
GH-566: Use ConcurrentMaps for offset management
Fixes #566 In case the user listener hands over the `Acknowledgment` to another thread.
1 parent 3716111 commit 114f409

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.Map;
2929
import java.util.Map.Entry;
3030
import java.util.concurrent.BlockingQueue;
31+
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.ConcurrentMap;
3133
import java.util.concurrent.LinkedBlockingQueue;
3234
import java.util.concurrent.ScheduledFuture;
3335

@@ -320,7 +322,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
320322

321323
private final Consumer<K, V> consumer;
322324

323-
private final Map<String, Map<Integer, Long>> offsets = new HashMap<>();
325+
private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsets = new ConcurrentHashMap<>();
324326

325327
private final GenericMessageListener<?> genericListener;
326328

@@ -1202,7 +1204,7 @@ record = this.acks.poll();
12021204
}
12031205

12041206
private void addOffset(ConsumerRecord<K, V> record) {
1205-
this.offsets.computeIfAbsent(record.topic(), v -> new HashMap<>())
1207+
this.offsets.computeIfAbsent(record.topic(), v -> new ConcurrentHashMap<>())
12061208
.compute(record.partition(), (k, v) -> v == null ? record.offset() : Math.max(v, record.offset()));
12071209
}
12081210

@@ -1230,7 +1232,7 @@ private void commitIfNecessary() {
12301232

12311233
private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
12321234
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
1233-
for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
1235+
for (Entry<String, ConcurrentMap<Integer, Long>> entry : this.offsets.entrySet()) {
12341236
for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
12351237
commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
12361238
new OffsetAndMetadata(offset.getValue() + 1));

0 commit comments

Comments
 (0)