Skip to content

GH-2375: Add KLER.unregisterListenerContainer #2376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2717,6 +2717,59 @@ Also see `interceptBeforeTx`.
|n/a
|A reference to all child `KafkaMessageListenerContainer` s.
|===

[[dynamic-containers]]
==== Dynamically Creating Containers

There are several techniques that can be used to create listener containers at runtime.
This section explores some of those techniques.

===== MessageListener Implementations

If you implement your own listener directly, you can simply use the container factory to create a raw container for that listener:

.User Listener
====
[source, java, role="primary", indent=0]
.Java
----
include::{java-examples}/dynamic/MyListener.java[tag=listener]
include::{java-examples}/dynamic/Application.java[tag=create]
----
[source, kotlin, role="secondary",indent=0]
.Kotlin
----
include::{kotlin-examples}/dynamic/Application.kt[tag=listener]
include::{kotlin-examples}/dynamic/Application.kt[tag=create]
----
====

===== Prototype Beans

Containers for methods annotated with `@KafkaListener` can be created dynamically by declaring the bean as prototype:

.Prototype
====
[source, java, role="primary", indent=0]
.Java
----
include::{java-examples}/dynamic/MyPojo.java[tag=pojo]
include::{java-examples}/dynamic/Application.java[tag=pojoBean]
include::{java-examples}/dynamic/Application.java[tag=getBeans]
----
[source, kotlin, role="secondary",indent=0]
.Kotlin
----
include::{kotlin-examples}/dynamic/Application.kt[tag=pojo]
include::{kotlin-examples}/dynamic/Application.kt[tag=pojoBean]
include::{kotlin-examples}/dynamic/Application.kt[tag=getBeans]
----
====

IMPORTANT: Listeners must have unique IDs.
Starting with version 2.8.9, the `KafkaListenerEndpointRegistry` has a new method `unregisterListenerContainer(String id)` to allow you to re-use an id.
Unregistering a container does not `stop()` the container, you must do that yourself.

[[events]]
==== Application Events

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.jdocs.dynamic;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

/**
* Dynamic listeners.
*
* @author Gary Russell
* @since 2.8.9
*
*/
@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
ApplicationRunner runner(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
return args -> {
createContainer(factory, "topic1", "group1");
};
}

@Bean
public ApplicationRunner runner1(ApplicationContext applicationContext) {
return args -> {
// tag::getBeans[]

applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
// end::getBeans[]
};
}


// tag::create[]

private ConcurrentMessageListenerContainer<String, String> createContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {

ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(new MyListener());
container.getContainerProperties().setGroupId(group);
container.setBeanName(group);
container.start();
return container;
}
// end::create[]
@Bean
public KafkaAdmin.NewTopics topics() {
return new KafkaAdmin.NewTopics(
TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build(),
TopicBuilder.name("topic2")
.partitions(10)
.replicas(1)
.build(),
TopicBuilder.name("topic3")
.partitions(10)
.replicas(1)
.build());
}

// tag::pojoBean[]

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
return new MyPojo(id, topic);
}
//end::pojoBean[]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.jdocs.dynamic;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.listener.MessageListener;

/**
* {@link MessageListener} for dynamic containers.
* @author Gary Russell
* @since 2.8.9
*
*/
//tag::listener[]

public class MyListener implements MessageListener<String, String> {

@Override
public void onMessage(ConsumerRecord<String, String> data) {
// ...
}

}
// end::listener[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.jdocs.dynamic;

import org.springframework.kafka.annotation.KafkaListener;

/**
* Pojo for dynamic listener creation.
* @author Gary Russell
* @since 2.8.9
*
*/
//tag::pojo[]

public class MyPojo {

private final String id;

private final String topic;

public MyPojo(String id, String topic) {
this.id = id;
this.topic = topic;
}

public String getId() {
return this.id;
}

public String getTopic() {
return this.topic;
}

@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
public void listen(String in) {
System.out.println(in);
}

}
// end::pojo[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.kdocs.dynamic

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.beans.factory.config.ConfigurableBeanFactory
import org.springframework.boot.ApplicationArguments
import org.springframework.boot.ApplicationRunner
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Scope
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.config.TopicBuilder
import org.springframework.kafka.core.KafkaAdmin.NewTopics
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
import org.springframework.kafka.listener.MessageListener

/**
* @author Gary Russell
* @since 2.8.9
*/
@SpringBootApplication
class Application {
@Bean
fun runner(factory: ConcurrentKafkaListenerContainerFactory<String, String>): ApplicationRunner {
return ApplicationRunner { args: ApplicationArguments? -> createContainer(factory, "topic1", "group1") }
}

@Bean
fun runner(applicationContext: ApplicationContext): ApplicationRunner {
return ApplicationRunner { args: ApplicationArguments? ->
// tag::getBeans[]

applicationContext.getBean(MyPojo::class.java, "one", arrayOf("topic2"))
applicationContext.getBean(MyPojo::class.java, "two", arrayOf("topic3"))
// end::getBeans[]
}
}

// tag::create[]

private fun createContainer(
factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
): ConcurrentMessageListenerContainer<String, String> {
val container = factory.createContainer(topic)
container.containerProperties.messageListener = MyListener()
container.containerProperties.groupId = group
container.beanName = group
container.start()
return container
}
// end::create[]
@Bean
fun topics(): NewTopics {
return NewTopics(
TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build(),
TopicBuilder.name("topic2")
.partitions(10)
.replicas(1)
.build(),
TopicBuilder.name("topic3")
.partitions(10)
.replicas(1)
.build()
)
}

// tag::pojoBean[]

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun pojo(id: String?, topic: String?): MyPojo {
return MyPojo(id, topic)
}
//end::pojoBean[]

companion object {
@JvmStatic
fun main(args: Array<String>) {
SpringApplication.run(Application::class.java, *args).close()
}
}
}

// tag::listener[]

class MyListener : MessageListener<String?, String?> {

override fun onMessage(data: ConsumerRecord<String?, String?>) {
// ...
}

}
// end::listener[]

// tag::pojo[]

class MyPojo(id: String?, topic: String?) {

@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topics}"])
fun listen(`in`: String?) {
println(`in`)
}

}
// end::pojo[]
2 changes: 1 addition & 1 deletion spring-kafka-docs/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<appender-ref ref="STDOUT" />
</root>

<logger name="org.springframework" level="WARN" />
<logger name="org.springframework" level="INFO" />
<logger name="org.springframework.kafka.jdocs" level="INFO" />
<logger name="org.springframework.kafka.Kdocs" level="INFO" />

Expand Down
Loading