Skip to content

Commit 3d90d6a

Browse files
committed
refactor: allow topic pattern for validtion
Signed-off-by: moonyoungCHAE <[email protected]>
1 parent 01db10f commit 3d90d6a

File tree

1 file changed

+16
-1
lines changed

1 file changed

+16
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,10 +666,11 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> en
666666
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
667667
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
668668

669-
Assert.state((topics.length > 0) ^ (tps.length > 0), "Only one of @Topic or @TopicPartition is allowed");
669+
assertTopic(kafkaListener);
670670
endpoint.setTopicPartitions(tps);
671671
endpoint.setTopics(topics);
672672
endpoint.setTopicPattern(resolvePattern(kafkaListener));
673+
673674
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
674675
endpoint.setListenerInfo(resolveExpressionAsBytes(kafkaListener.info(), "info"));
675676
String group = kafkaListener.containerGroup();
@@ -860,6 +861,20 @@ private String getEndpointGroupId(KafkaListener kafkaListener, @Nullable String
860861
return groupId;
861862
}
862863

864+
private void assertTopic(KafkaListener kafkaListener) {
865+
int count = 0;
866+
if (!kafkaListener.topicPattern().isEmpty()) {
867+
count++;
868+
}
869+
if (kafkaListener.topics().length > 0) {
870+
count++;
871+
}
872+
if (kafkaListener.topicPartitions().length > 0) {
873+
count++;
874+
}
875+
Assert.state(count == 1, "Only one of @Topic or @TopicPartition or @TopicPattern must be provided");
876+
}
877+
863878
private TopicPartitionOffset[] resolveTopicPartitions(KafkaListener kafkaListener) {
864879
TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
865880
List<TopicPartitionOffset> result = new ArrayList<>();

0 commit comments

Comments
 (0)