Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1734,19 +1734,24 @@ IMPORTANT: This is available in record listeners and batch listeners that receiv
It is **not** available in a batch listener that receives a `ConsumerRecords<?, ?>` argument.
Use the `KafkaUtils` mechanism in that case.

[[container-thread-naming]]
===== Container Thread Naming

Listener containers currently use two task executors, one to invoke the consumer and another that is used to invoke the listener when the kafka consumer property `enable.auto.commit` is `false`.
You can provide custom executors by setting the `consumerExecutor` and `listenerExecutor` properties of the container's `ContainerProperties`.
A `TaskExecutor` is used to invoke the consumer and the listener.
You can provide a custom executor by setting the `consumerExecutor` property of the container's `ContainerProperties`.
When using pooled executors, be sure that enough threads are available to handle the concurrency across all the containers in which they are used.
When using the `ConcurrentMessageListenerContainer`, a thread from each is used for each consumer (`concurrency`).
When using the `ConcurrentMessageListenerContainer`, a thread from the executor is used for each consumer (`concurrency`).

If you do not provide a consumer executor, a `SimpleAsyncTaskExecutor` is used.
This executor creates threads with names similar to `<beanName>-C-1` (consumer thread).
If you do not provide a consumer executor, a `SimpleAsyncTaskExecutor` is used for each container.
This executor creates threads with names similar to `<beanName>-C-<n>`.
For the `ConcurrentMessageListenerContainer`, the `<beanName>` part of the thread name becomes `<beanName>-m`, where `m` represents the consumer instance.
`n` increments each time the container is started.
So, with a bean name of `container`, threads in this container will be named `container-0-C-1`, `container-1-C-1` etc., after the container is started the first time; `container-0-C-2`, `container-1-C-2` etc., after a stop and subsequent start.

Starting with version `3.0.1`, you can now change the name of the thread, regardless of which executor is used.
Set the `ContainerProperties.changeConsumerThreadName` property to `true` and the `ContainerProperties.threadNameSupplier` will be invoked to obtain the thread name.
This is a `Function<MessageListenerContainer, String>`, with the default implementation returning `container.getListenerId()`.

[[kafka-listener-meta]]
===== `@KafkaListener` as a Meta Annotation

Expand Down
3 changes: 3 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ these methods now require an `ObjectProvider<RetryTopicComponentFactory>` parame
Events related to consumer authentication and authorization failures are now published by the container.
See <<events>> for more information.

You can now customize the thread names used by consumer threads.
See <<container-thread-naming>> for more information.

[[x30-template-changes]]
==== `KafkaTemplate` Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;

import org.aopalliance.aop.Advice;
Expand All @@ -33,6 +34,7 @@
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
Expand Down Expand Up @@ -287,6 +289,11 @@ public enum EOSMode {

private KafkaListenerObservationConvention observationConvention;

private boolean changeConsumerThreadName;

@NonNull
private Function<MessageListenerContainer, String> threadNameSupplier = container -> container.getListenerId();

/**
* Create properties for a container that will subscribe to the specified topics.
* @param topics the topics.
Expand Down Expand Up @@ -945,6 +952,48 @@ public void setObservationConvention(KafkaListenerObservationConvention observat
this.observationConvention = observationConvention;
}

/**
* Return true if the container should change the consumer thread name during
* initialization.
* @return true to change.
* @since 3.0.1
*/
public boolean isChangeConsumerThreadName() {
return this.changeConsumerThreadName;
}

/**
* Set to true to instruct the container to change the consumer thread name during
* initialization.
* @param changeConsumerThreadName true to change.
* @since 3.0.1
* @see #setThreadNameSupplier(Function)
*/
public void setChangeConsumerThreadName(boolean changeConsumerThreadName) {
this.changeConsumerThreadName = changeConsumerThreadName;
}

/**
* Return the function used to change the consumer thread name.
* @return the function.
* @since 3.0.1
*/
public Function<MessageListenerContainer, String> getThreadNameSupplier() {
return this.threadNameSupplier;
}

/**
* Set a function used to change the consumer thread name. The default returns the
* container {@code listenerId}.
* @param threadNameSupplier the function.
* @since 3.0.1
* @see #setChangeConsumerThreadName(boolean)
*/
public void setThreadNameSupplier(Function<MessageListenerContainer, String> threadNameSupplier) {
Assert.notNull(threadNameSupplier, "'threadNameSupplier' cannot be null");
this.threadNameSupplier = threadNameSupplier;
}

@Override
public String toString() {
return "ContainerProperties ["
Expand Down Expand Up @@ -981,6 +1030,7 @@ public String toString() {
+ (this.observationConvention != null
? "\n observationConvention=" + this.observationConvention
: "")
+ "\n changeConsumerThreadName=" + this.changeConsumerThreadName
+ "\n]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1397,6 +1397,10 @@ public void run() {
}

protected void initialize() {
if (this.containerProperties.isChangeConsumerThreadName()) {
Thread.currentThread().setName(
this.containerProperties.getThreadNameSupplier().apply(KafkaMessageListenerContainer.this));
}
publishConsumerStartingEvent();
this.consumerThread = Thread.currentThread();
setupSeeks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
ContainerProperties containerProps = new ContainerProperties(topic1);
containerProps.setLogContainerConfig(true);
containerProps.setClientId("client");
containerProps.setChangeConsumerThreadName(true);

final CountDownLatch latch = new CountDownLatch(3);
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
Expand Down Expand Up @@ -178,17 +179,15 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic1);
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.sendDefault(0, 0, "foo");
template.sendDefault(1, 2, "bar");
template.sendDefault(0, 0, "baz");
template.sendDefault(1, 2, "qux");
template.flush();
assertThat(intercepted.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(payloads).containsExactlyInAnyOrder("foo", "bar", "qux");
for (String threadName : listenerThreadNames) {
assertThat(threadName).contains("-C-");
}
assertThat(listenerThreadNames).contains("testAuto-0", "testAuto-1");
List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
"containers", List.class);
assertThat(containers).hasSize(2);
Expand Down