diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 1cb0c7d5b8..d7271b2669 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -2717,6 +2717,59 @@ Also see `interceptBeforeTx`. |n/a |A reference to all child `KafkaMessageListenerContainer` s. |=== + +[[dynamic-containers]] +==== Dynamically Creating Containers + +There are several techniques that can be used to create listener containers at runtime. +This section explores some of those techniques. + +===== MessageListener Implementations + +If you implement your own listener directly, you can simply use the container factory to create a raw container for that listener: + +.User Listener +==== +[source, java, role="primary", indent=0] +.Java +---- +include::{java-examples}/dynamic/MyListener.java[tag=listener] +include::{java-examples}/dynamic/Application.java[tag=create] +---- +[source, kotlin, role="secondary",indent=0] +.Kotlin +---- +include::{kotlin-examples}/dynamic/Application.kt[tag=listener] +include::{kotlin-examples}/dynamic/Application.kt[tag=create] +---- +==== + +===== Prototype Beans + +Containers for methods annotated with `@KafkaListener` can be created dynamically by declaring the bean as prototype: + +.Prototype +==== +[source, java, role="primary", indent=0] +.Java +---- +include::{java-examples}/dynamic/MyPojo.java[tag=pojo] +include::{java-examples}/dynamic/Application.java[tag=pojoBean] +include::{java-examples}/dynamic/Application.java[tag=getBeans] +---- +[source, kotlin, role="secondary",indent=0] +.Kotlin +---- +include::{kotlin-examples}/dynamic/Application.kt[tag=pojo] +include::{kotlin-examples}/dynamic/Application.kt[tag=pojoBean] +include::{kotlin-examples}/dynamic/Application.kt[tag=getBeans] +---- +==== + +IMPORTANT: Listeners must have unique IDs. +Starting with version 2.8.9, the `KafkaListenerEndpointRegistry` has a new method `unregisterListenerContainer(String id)` to allow you to re-use an id. +Unregistering a container does not `stop()` the container, you must do that yourself. + [[events]] ==== Application Events diff --git a/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/dynamic/Application.java b/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/dynamic/Application.java new file mode 100644 index 0000000000..d975c87e45 --- /dev/null +++ b/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/dynamic/Application.java @@ -0,0 +1,103 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.jdocs.dynamic; + +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Scope; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; + +/** + * Dynamic listeners. + * + * @author Gary Russell + * @since 2.8.9 + * + */ +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + @Bean + ApplicationRunner runner(ConcurrentKafkaListenerContainerFactory factory) { + return args -> { + createContainer(factory, "topic1", "group1"); + }; + } + + @Bean + public ApplicationRunner runner1(ApplicationContext applicationContext) { + return args -> { +// tag::getBeans[] + +applicationContext.getBean(MyPojo.class, "one", "topic2"); +applicationContext.getBean(MyPojo.class, "two", "topic3"); +// end::getBeans[] + }; + } + + +// tag::create[] + +private ConcurrentMessageListenerContainer createContainer( + ConcurrentKafkaListenerContainerFactory factory, String topic, String group) { + + ConcurrentMessageListenerContainer container = factory.createContainer(topic); + container.getContainerProperties().setMessageListener(new MyListener()); + container.getContainerProperties().setGroupId(group); + container.setBeanName(group); + container.start(); + return container; +} +// end::create[] +@Bean +public KafkaAdmin.NewTopics topics() { + return new KafkaAdmin.NewTopics( + TopicBuilder.name("topic1") + .partitions(10) + .replicas(1) + .build(), + TopicBuilder.name("topic2") + .partitions(10) + .replicas(1) + .build(), + TopicBuilder.name("topic3") + .partitions(10) + .replicas(1) + .build()); +} + +// tag::pojoBean[] + +@Bean +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +MyPojo pojo(String id, String topic) { + return new MyPojo(id, topic); +} +//end::pojoBean[] + +} diff --git a/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/dynamic/MyListener.java b/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/dynamic/MyListener.java new file mode 100644 index 0000000000..7bb53b4b6c --- /dev/null +++ b/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/dynamic/MyListener.java @@ -0,0 +1,39 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.jdocs.dynamic; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import org.springframework.kafka.listener.MessageListener; + +/** + * {@link MessageListener} for dynamic containers. + * @author Gary Russell + * @since 2.8.9 + * + */ +//tag::listener[] + +public class MyListener implements MessageListener { + + @Override + public void onMessage(ConsumerRecord data) { + // ... + } + +} +// end::listener[] diff --git a/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/dynamic/MyPojo.java b/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/dynamic/MyPojo.java new file mode 100644 index 0000000000..8fe90e7c9b --- /dev/null +++ b/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/dynamic/MyPojo.java @@ -0,0 +1,54 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.jdocs.dynamic; + +import org.springframework.kafka.annotation.KafkaListener; + +/** + * Pojo for dynamic listener creation. + * @author Gary Russell + * @since 2.8.9 + * + */ +//tag::pojo[] + +public class MyPojo { + + private final String id; + + private final String topic; + + public MyPojo(String id, String topic) { + this.id = id; + this.topic = topic; + } + + public String getId() { + return this.id; + } + + public String getTopic() { + return this.topic; + } + + @KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}") + public void listen(String in) { + System.out.println(in); + } + +} +// end::pojo[] diff --git a/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/dynamic/Application.kt b/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/dynamic/Application.kt new file mode 100644 index 0000000000..6350987ee0 --- /dev/null +++ b/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/dynamic/Application.kt @@ -0,0 +1,125 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.kafka.kdocs.dynamic + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.beans.factory.config.ConfigurableBeanFactory +import org.springframework.boot.ApplicationArguments +import org.springframework.boot.ApplicationRunner +import org.springframework.boot.SpringApplication +import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.context.ApplicationContext +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Scope +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.config.TopicBuilder +import org.springframework.kafka.core.KafkaAdmin.NewTopics +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer +import org.springframework.kafka.listener.MessageListener + +/** + * @author Gary Russell + * @since 2.8.9 + */ +@SpringBootApplication +class Application { + @Bean + fun runner(factory: ConcurrentKafkaListenerContainerFactory): ApplicationRunner { + return ApplicationRunner { args: ApplicationArguments? -> createContainer(factory, "topic1", "group1") } + } + + @Bean + fun runner(applicationContext: ApplicationContext): ApplicationRunner { + return ApplicationRunner { args: ApplicationArguments? -> +// tag::getBeans[] + +applicationContext.getBean(MyPojo::class.java, "one", arrayOf("topic2")) +applicationContext.getBean(MyPojo::class.java, "two", arrayOf("topic3")) +// end::getBeans[] + } + } + +// tag::create[] + +private fun createContainer( + factory: ConcurrentKafkaListenerContainerFactory, topic: String, group: String +): ConcurrentMessageListenerContainer { + val container = factory.createContainer(topic) + container.containerProperties.messageListener = MyListener() + container.containerProperties.groupId = group + container.beanName = group + container.start() + return container +} +// end::create[] + @Bean + fun topics(): NewTopics { + return NewTopics( + TopicBuilder.name("topic1") + .partitions(10) + .replicas(1) + .build(), + TopicBuilder.name("topic2") + .partitions(10) + .replicas(1) + .build(), + TopicBuilder.name("topic3") + .partitions(10) + .replicas(1) + .build() + ) + } + +// tag::pojoBean[] + +@Bean +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +fun pojo(id: String?, topic: String?): MyPojo { + return MyPojo(id, topic) +} +//end::pojoBean[] + + companion object { + @JvmStatic + fun main(args: Array) { + SpringApplication.run(Application::class.java, *args).close() + } + } +} + +// tag::listener[] + +class MyListener : MessageListener { + + override fun onMessage(data: ConsumerRecord) { + // ... + } + +} +// end::listener[] + +// tag::pojo[] + +class MyPojo(id: String?, topic: String?) { + + @KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topics}"]) + fun listen(`in`: String?) { + println(`in`) + } + +} +// end::pojo[] diff --git a/spring-kafka-docs/src/main/resources/logback.xml b/spring-kafka-docs/src/main/resources/logback.xml index 6ccfbe90ef..4ed234e458 100644 --- a/spring-kafka-docs/src/main/resources/logback.xml +++ b/spring-kafka-docs/src/main/resources/logback.xml @@ -12,7 +12,7 @@ - + diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java index d112b162b6..5ae4c4e2c1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java @@ -242,6 +242,22 @@ public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListe } } + /** + * Unregister the listener container with the provided id. + *

+ * IMPORTANT: this method simply removes the container from the registry. It does NOT + * call any {@link org.springframework.context.Lifecycle} or {@link DisposableBean} + * methods; you need to call them before or after calling this method to shut down the + * container. + * @param id the id. + * @return the container, if it was registered; null otherwise. + * @since 2.8.9 + */ + @Nullable + public MessageListenerContainer unregisterListenerContainer(String id) { + return this.listenerContainers.remove(id); + } + /** * Create and start a new {@link MessageListenerContainer} using the specified factory. * @param endpoint the endpoint to create a {@link MessageListenerContainer}. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaListenerEndpointRegistryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaListenerEndpointRegistryTests.java new file mode 100644 index 0000000000..bccf4bd6de --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaListenerEndpointRegistryTests.java @@ -0,0 +1,50 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; + +import org.junit.jupiter.api.Test; + +import org.springframework.kafka.listener.MessageListenerContainer; + +/** + * @author Gary Russell + * @since 2.8.9 + * + */ +public class KafkaListenerEndpointRegistryTests { + + @Test + void unregister() { + KafkaListenerEndpointRegistry registry = new KafkaListenerEndpointRegistry(); + KafkaListenerEndpoint endpoint = mock(KafkaListenerEndpoint.class); + @SuppressWarnings("unchecked") + KafkaListenerContainerFactory factory = mock(KafkaListenerContainerFactory.class); + given(endpoint.getId()).willReturn("foo"); + MessageListenerContainer container = mock(MessageListenerContainer.class); + given(factory.createListenerContainer(endpoint)).willReturn(container); + registry.registerListenerContainer(endpoint, factory); + MessageListenerContainer unregistered = registry.unregisterListenerContainer("foo"); + assertThat(unregistered).isSameAs(container); + registry.registerListenerContainer(endpoint, factory); + assertThat(unregistered).isSameAs(container); + } + +}