Skip to content

Commit 5ddc0b8

Browse files
garyrussellartembilan
authored andcommitted
GH-2521: Fix __listener in id Regression
Resolves #2521 The `__listener` pseudo bean name is not available for bean resolution in the `@KafkaListener.id` attribute.
1 parent d34394e commit 5ddc0b8

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,10 +481,10 @@ protected void processKafkaListener(KafkaListener kafkaListener, Method method,
481481
Method methodToUse = checkProxy(method, bean);
482482
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
483483
endpoint.setMethod(methodToUse);
484-
endpoint.setId(getEndpointId(kafkaListener));
485484

486485
String beanRef = kafkaListener.beanRef();
487486
this.listenerScope.addListener(beanRef, bean);
487+
endpoint.setId(getEndpointId(kafkaListener));
488488
String[] topics = resolveTopics(kafkaListener);
489489
TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
490490
if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {

spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.springframework.kafka.KafkaException.Level;
4747
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
4848
import org.springframework.kafka.config.KafkaListenerContainerFactory;
49+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
4950
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5051
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
5152
import org.springframework.kafka.core.KafkaAdmin;
@@ -97,7 +98,8 @@ public class BatchListenerConversionTests {
9798
private KafkaTemplate<Integer, Object> template;
9899

99100
@Test
100-
public void testBatchOfPojos() throws Exception {
101+
public void testBatchOfPojos(@Autowired KafkaListenerEndpointRegistry registry) throws Exception {
102+
assertThat(registry.getListenerContainerIds()).contains("blc1.id", "blc2.id");
101103
doTest(this.listener1, "blc1");
102104
doTest(this.listener2, "blc2");
103105
}
@@ -273,7 +275,8 @@ public KafkaListenerContainerFactory<?> getContainerFactory() {
273275
return this.cf;
274276
}
275277

276-
@KafkaListener(topics = "#{__listener.topic}", groupId = "#{__listener.topic}.group",
278+
@KafkaListener(id = "#{__listener.topic}.id", topics = "#{__listener.topic}",
279+
groupId = "#{__listener.topic}.group",
277280
containerFactory = "#{__listener.containerFactory}")
278281
// @SendTo("foo") test WARN log for void return
279282
public void listen1(List<Foo> foos, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,

0 commit comments

Comments
 (0)