From 665a345e9373ac437ecefabcb6659d7b4a339d6f Mon Sep 17 00:00:00 2001 From: Janek Lasocki-Biczysko Date: Mon, 24 Feb 2025 09:46:42 +0000 Subject: [PATCH 1/4] feat: Replace LinkedList with ArrayList As per https://github.com/spring-projects/spring-kafka/issues/3764 Signed-off-by: Janek Lasocki-Biczysko --- .../kafka/listener/KafkaMessageListenerContainer.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index a859cce0bb..a124f3f131 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -16,6 +16,7 @@ package org.springframework.kafka.listener; +import java.lang.reflect.Array; import java.nio.ByteBuffer; import java.time.Duration; import java.util.AbstractMap.SimpleEntry; @@ -2238,11 +2239,14 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { private List> createRecordList(final ConsumerRecords records) { Iterator> iterator = records.iterator(); - List> list = new LinkedList<>(); + @SuppressWarnings("unchecked") ConsumerRecord[] recordsArray = + (ConsumerRecord[]) Array.newInstance(ConsumerRecord.class, records.count()); + int index = 0; while (iterator.hasNext()) { - list.add(iterator.next()); + recordsArray[index] = iterator.next(); + index += 1; } - return list; + return Arrays.asList(recordsArray); } /** From 37cc715b0f1049cc71cbd6823419de291bfb74c1 Mon Sep 17 00:00:00 2001 From: Janek Lasocki-Biczysko Date: Mon, 24 Feb 2025 16:45:21 +0000 Subject: [PATCH 2/4] update RecordFilterStrategy to use stream.filter Signed-off-by: Janek Lasocki-Biczysko --- .../kafka/listener/KafkaMessageListenerContainer.java | 1 + .../kafka/listener/adapter/RecordFilterStrategy.java | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index a124f3f131..6aaab0c663 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -173,6 +173,7 @@ * @author Sanghyeok An * @author Christian Fredriksson * @author Timofey Barabanov + * @author Janek Lasocki-Biczysko */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java index aa06755512..a607b501cb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ * * @author Gary Russell * @author Sanghyeok An + * @author Janek Lasocki-Biczysko */ public interface RecordFilterStrategy { @@ -50,8 +51,7 @@ public interface RecordFilterStrategy { * @since 2.8 */ default List> filterBatch(List> records) { - records.removeIf(this::filter); - return records; + return records.stream().filter(record -> !this.filter(record)).toList(); } /** From 2e3e2a9f4e0fe18e96a781b1034fd8efba4ea5c0 Mon Sep 17 00:00:00 2001 From: Janek Lasocki-Biczysko Date: Mon, 24 Feb 2025 16:58:42 +0000 Subject: [PATCH 3/4] ensure default RecordFilterStrategy#filterBatch returns a mutable list Signed-off-by: Janek Lasocki-Biczysko --- .../kafka/listener/adapter/RecordFilterStrategy.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java index a607b501cb..f233887c4f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java @@ -16,6 +16,7 @@ package org.springframework.kafka.listener.adapter; +import java.util.Arrays; import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -50,8 +51,10 @@ public interface RecordFilterStrategy { * @return the filtered records. * @since 2.8 */ + @SuppressWarnings("unchecked") default List> filterBatch(List> records) { - return records.stream().filter(record -> !this.filter(record)).toList(); + var recordsArray = records.stream().filter(record -> !this.filter(record)).toArray(ConsumerRecord[]::new); + return Arrays.asList(recordsArray); } /** From 14e385ef2b8f18629242ca2635f9c13dcd8db9d3 Mon Sep 17 00:00:00 2001 From: Janek Lasocki-Biczysko Date: Mon, 24 Feb 2025 19:54:47 +0000 Subject: [PATCH 4/4] turns the simplest approach works best --- .../listener/KafkaMessageListenerContainer.java | 13 +++---------- .../listener/adapter/RecordFilterStrategy.java | 9 +++------ 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 6aaab0c663..87222dfea0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -16,7 +16,6 @@ package org.springframework.kafka.listener; -import java.lang.reflect.Array; import java.nio.ByteBuffer; import java.time.Duration; import java.util.AbstractMap.SimpleEntry; @@ -2239,15 +2238,9 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { } private List> createRecordList(final ConsumerRecords records) { - Iterator> iterator = records.iterator(); - @SuppressWarnings("unchecked") ConsumerRecord[] recordsArray = - (ConsumerRecord[]) Array.newInstance(ConsumerRecord.class, records.count()); - int index = 0; - while (iterator.hasNext()) { - recordsArray[index] = iterator.next(); - index += 1; - } - return Arrays.asList(recordsArray); + List> recordList = new ArrayList<>(records.count()); + records.forEach(recordList::add); + return recordList; } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java index f233887c4f..aa06755512 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2025 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ package org.springframework.kafka.listener.adapter; -import java.util.Arrays; import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -33,7 +32,6 @@ * * @author Gary Russell * @author Sanghyeok An - * @author Janek Lasocki-Biczysko */ public interface RecordFilterStrategy { @@ -51,10 +49,9 @@ public interface RecordFilterStrategy { * @return the filtered records. * @since 2.8 */ - @SuppressWarnings("unchecked") default List> filterBatch(List> records) { - var recordsArray = records.stream().filter(record -> !this.filter(record)).toArray(ConsumerRecord[]::new); - return Arrays.asList(recordsArray); + records.removeIf(this::filter); + return records; } /**