Skip to content

Commit ad09ce3

Browse files
garyrussellartembilan
authored andcommitted
GH-2638: Add Dynamic Micrometer Tags
* GH-2638: Add Dynamic Micrometer Tags Resolves #2638 **cherry-pick to 2.9.x** TODO: Support for Observations when `observationEnabled` (3.0.x only). * Fix docs. # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java # spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/MicrometerHolder.java
1 parent b493947 commit ad09ce3

File tree

6 files changed

+232
-28
lines changed

6 files changed

+232
-28
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2590,6 +2590,14 @@ Also see `idleBeforeDataMultiplier`.
25902590
|`true`
25912591
|Whether or not to maintain Micrometer timers for the consumer threads.
25922592

2593+
|[[micrometerTags]]<<micrometerTags,`micrometerTags`>>
2594+
|empty
2595+
|A map of static tags to be added to micrometer metrics.
2596+
2597+
|[[micrometerTagsProvider]]<<micrometerTagsProvider,`micrometerTagsProvider`>>
2598+
|`null`
2599+
|A function that provides dynamic tags, based on the consumer record.
2600+
25932601
|[[missingTopicsFatal]]<<missingTopicsFatal,`missingTopicsFatal`>>
25942602
|`false`
25952603
|When true prevents the container from starting if the confifgured topic(s) are not present on the broker.
@@ -3401,6 +3409,8 @@ The timers are named `spring.kafka.listener` and have the following tags:
34013409

34023410
You can add additional tags using the `ContainerProperties` `micrometerTags` property.
34033411

3412+
Starting with versions 2.9.8, 3.0.6, you can provide a function in `ContainerProperties` `micrometerTagsProvider`; the function receives the `ConsumerRecord<?, ?>` and returns tags which can be based on that record, and merged with any static tags in `micrometerTags`.
3413+
34043414
NOTE: With the concurrent container, timers are created for each thread and the `name` tag is suffixed with `-n` where n is `0` to `concurrency-1`.
34053415

34063416
===== Monitoring KafkaTemplate Performance
@@ -3418,6 +3428,8 @@ The timers are named `spring.kafka.template` and have the following tags:
34183428

34193429
You can add additional tags using the template's `micrometerTags` property.
34203430

3431+
Starting with versions 2.9.8, 3.0.6, you can provide a `KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)` property; the function receives the `ProducerRecord<?, ?>` and returns tags which can be based on that record, and merged with any static tags in `micrometerTags`.
3432+
34213433
[[micrometer-native]]
34223434
===== Micrometer Native Metrics
34233435

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Properties;
2828
import java.util.concurrent.ExecutionException;
2929
import java.util.concurrent.Future;
30+
import java.util.function.Function;
3031

3132
import org.apache.commons.logging.LogFactory;
3233
import org.apache.kafka.clients.consumer.Consumer;
@@ -130,6 +131,8 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo
130131

131132
private volatile MicrometerHolder micrometerHolder;
132133

134+
@Nullable
135+
private Function<ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider;
133136
/**
134137
* Create an instance using the supplied producer factory and autoFlush false.
135138
* @param producerFactory the producer factory.
@@ -341,6 +344,33 @@ public void setMicrometerTags(Map<String, String> tags) {
341344
}
342345
}
343346

347+
/**
348+
* Set a function to provide dynamic tags based on the producer record. These tags
349+
* will be added to any static tags provided in {@link #setMicrometerTags(Map)
350+
* micrometerTags}. Only applies to record listeners, ignored for batch listeners.
351+
* Does not apply if observation is enabled.
352+
* @param micrometerTagsProvider the micrometerTagsProvider.
353+
* @since 2.9.8
354+
* @see #setMicrometerEnabled(boolean)
355+
* @see #setMicrometerTags(Map)
356+
* @see #setObservationEnabled(boolean)
357+
*/
358+
public void setMicrometerTagsProvider(
359+
@Nullable Function<ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider) {
360+
361+
this.micrometerTagsProvider = micrometerTagsProvider;
362+
}
363+
364+
/**
365+
* Return the Micrometer tags provider.
366+
* @return the micrometerTagsProvider.
367+
* @since 2.9.8
368+
*/
369+
@Nullable
370+
public Function<ProducerRecord<?, ?>, Map<String, String>> getMicrometerTagsProvider() {
371+
return this.micrometerTagsProvider;
372+
}
373+
344374
/**
345375
* Return the producer factory used by this template.
346376
* @return the factory.
@@ -680,9 +710,7 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
680710
return (metadata, exception) -> {
681711
try {
682712
if (exception == null) {
683-
if (sample != null) {
684-
this.micrometerHolder.success(sample);
685-
}
713+
successTimer(sample, producerRecord);
686714
future.set(new SendResult<>(producerRecord, metadata));
687715
if (KafkaTemplate.this.producerListener != null) {
688716
KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);
@@ -691,9 +719,7 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
691719
+ ", metadata: " + metadata);
692720
}
693721
else {
694-
if (sample != null) {
695-
this.micrometerHolder.failure(sample, exception.getClass().getSimpleName());
696-
}
722+
failureTimer(sample, exception, producerRecord);
697723
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
698724
if (KafkaTemplate.this.producerListener != null) {
699725
KafkaTemplate.this.producerListener.onError(producerRecord, metadata, exception);
@@ -710,6 +736,28 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
710736
};
711737
}
712738

739+
private void successTimer(@Nullable Object sample, ProducerRecord<?, ?> record) {
740+
if (sample != null) {
741+
if (this.micrometerTagsProvider == null) {
742+
this.micrometerHolder.success(sample);
743+
}
744+
else {
745+
this.micrometerHolder.success(sample, record);
746+
}
747+
}
748+
}
749+
750+
private void failureTimer(@Nullable Object sample, Exception exception, ProducerRecord<?, ?> record) {
751+
if (sample != null) {
752+
if (this.micrometerTagsProvider == null) {
753+
this.micrometerHolder.failure(sample, exception.getClass().getSimpleName());
754+
}
755+
else {
756+
this.micrometerHolder.failure(sample, exception.getClass().getSimpleName(), record);
757+
}
758+
}
759+
}
760+
713761

714762
/**
715763
* Return true if the template is currently running in a transaction on the calling
@@ -767,9 +815,18 @@ private MicrometerHolder obtainMicrometerHolder() {
767815
MicrometerHolder holder = null;
768816
try {
769817
if (KafkaUtils.MICROMETER_PRESENT) {
818+
Function<Object, Map<String, String>> mergedProvider = cr -> this.micrometerTags;
819+
if (this.micrometerTagsProvider != null) {
820+
mergedProvider = cr -> {
821+
Map<String, String> tags = new HashMap<>(this.micrometerTags);
822+
if (cr != null) {
823+
tags.putAll(this.micrometerTagsProvider.apply((ProducerRecord<?, ?>) cr));
824+
}
825+
return tags;
826+
};
827+
}
770828
holder = new MicrometerHolder(this.applicationContext, this.beanName,
771-
"spring.kafka.template", "KafkaTemplate Timer",
772-
this.micrometerTags);
829+
"spring.kafka.template", "KafkaTemplate Timer", mergedProvider);
773830
}
774831
}
775832
catch (@SuppressWarnings("unused") IllegalStateException ex) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.function.Function;
2627
import java.util.regex.Pattern;
2728

2829
import org.aopalliance.aop.Advice;
30+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2931

3032
import org.springframework.aop.framework.Advised;
3133
import org.springframework.aop.framework.ProxyFactory;
@@ -229,6 +231,9 @@ public EOSMode getMode() {
229231

230232
private final List<Advice> adviceChain = new ArrayList<>();
231233

234+
@Nullable
235+
private Function<ConsumerRecord<?, ?>, Map<String, String>> micrometerTagsProvider;
236+
232237
/**
233238
* The ack mode to use when auto ack (in the configuration properties) is false.
234239
* <ul>
@@ -732,10 +737,42 @@ public void setMicrometerTags(Map<String, String> tags) {
732737
}
733738
}
734739

740+
/**
741+
* Return static Micrometer tags.
742+
* @return the tags.
743+
* @since 2.3
744+
*/
735745
public Map<String, String> getMicrometerTags() {
736746
return Collections.unmodifiableMap(this.micrometerTags);
737747
}
738748

749+
/**
750+
* Set a function to provide dynamic tags based on the consumer record. These tags
751+
* will be added to any static tags provided in {@link #setMicrometerTags(Map)
752+
* micrometerTags}. Only applies to record listeners, ignored for batch listeners.
753+
* Does not apply if observation is enabled.
754+
* @param micrometerTagsProvider the micrometerTagsProvider.
755+
* @since 2.9.8
756+
* @see #setMicrometerEnabled(boolean)
757+
* @see #setMicrometerTags(Map)
758+
* @see #setObservationEnabled(boolean)
759+
*/
760+
public void setMicrometerTagsProvider(
761+
@Nullable Function<ConsumerRecord<?, ?>, Map<String, String>> micrometerTagsProvider) {
762+
763+
this.micrometerTagsProvider = micrometerTagsProvider;
764+
}
765+
766+
/**
767+
* Return the Micrometer tags provider.
768+
* @return the micrometerTagsProvider.
769+
* @since 2.9.8
770+
*/
771+
@Nullable
772+
public Function<ConsumerRecord<?, ?>, Map<String, String>> getMicrometerTagsProvider() {
773+
return this.micrometerTagsProvider;
774+
}
775+
739776
public Duration getConsumerStartTimeout() {
740777
return this.consumerStartTimeout;
741778
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.concurrent.ScheduledFuture;
4343
import java.util.concurrent.TimeUnit;
4444
import java.util.concurrent.atomic.AtomicBoolean;
45+
import java.util.function.Function;
4546
import java.util.regex.Pattern;
4647
import java.util.stream.Collectors;
4748

@@ -752,6 +753,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
752753

753754
private final boolean pauseImmediate = this.containerProperties.isPauseImmediate();
754755

756+
@Nullable
757+
private final Function<ConsumerRecord<?, ?>, Map<String, String>> micrometerTagsProvider =
758+
this.containerProperties.getMicrometerTagsProvider();
755759
private Map<TopicPartition, OffsetMetadata> definedPartitions;
756760

757761
private int count;
@@ -1230,9 +1234,19 @@ private MicrometerHolder obtainMicrometerHolder() {
12301234
MicrometerHolder holder = null;
12311235
try {
12321236
if (KafkaUtils.MICROMETER_PRESENT && this.containerProperties.isMicrometerEnabled()) {
1237+
Function<Object, Map<String, String>> mergedProvider =
1238+
cr -> this.containerProperties.getMicrometerTags();
1239+
if (this.micrometerTagsProvider != null) {
1240+
mergedProvider = cr -> {
1241+
Map<String, String> tags = new HashMap<>(this.containerProperties.getMicrometerTags());
1242+
if (cr != null) {
1243+
tags.putAll(this.micrometerTagsProvider.apply((ConsumerRecord<?, ?>) cr));
1244+
}
1245+
return tags;
1246+
};
1247+
}
12331248
holder = new MicrometerHolder(getApplicationContext(), getBeanName(),
1234-
"spring.kafka.listener", "Kafka Listener Timer",
1235-
this.containerProperties.getMicrometerTags());
1249+
"spring.kafka.listener", "Kafka Listener Timer", mergedProvider);
12361250
}
12371251
}
12381252
catch (@SuppressWarnings(UNUSED) IllegalStateException ex) {
@@ -2211,7 +2225,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
22112225
try {
22122226
invokeBatchOnMessage(records, recordList);
22132227
batchInterceptAfter(records, null);
2214-
successTimer(sample);
2228+
successTimer(sample, null);
22152229
if (this.batchFailed) {
22162230
this.batchFailed = false;
22172231
if (this.commonErrorHandler != null) {
@@ -2224,7 +2238,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
22242238
}
22252239
}
22262240
catch (RuntimeException e) {
2227-
failureTimer(sample);
2241+
failureTimer(sample, null);
22282242
batchInterceptAfter(records, e);
22292243
if (this.commonErrorHandler == null) {
22302244
throw e;
@@ -2302,15 +2316,25 @@ private Object startMicrometerSample() {
23022316
return null;
23032317
}
23042318

2305-
private void successTimer(@Nullable Object sample) {
2319+
private void successTimer(@Nullable Object sample, @Nullable ConsumerRecord<?, ?> record) {
23062320
if (sample != null) {
2307-
this.micrometerHolder.success(sample);
2321+
if (this.micrometerTagsProvider == null || record == null) {
2322+
this.micrometerHolder.success(sample);
2323+
}
2324+
else {
2325+
this.micrometerHolder.success(sample, record);
2326+
}
23082327
}
23092328
}
23102329

2311-
private void failureTimer(@Nullable Object sample) {
2330+
private void failureTimer(@Nullable Object sample, @Nullable ConsumerRecord<?, ?> record) {
23122331
if (sample != null) {
2313-
this.micrometerHolder.failure(sample, "ListenerExecutionFailedException");
2332+
if (this.micrometerTagsProvider == null || record == null) {
2333+
this.micrometerHolder.failure(sample, "ListenerExecutionFailedException");
2334+
}
2335+
else {
2336+
this.micrometerHolder.failure(sample, "ListenerExecutionFailedException", record);
2337+
}
23142338
}
23152339
}
23162340

@@ -2698,11 +2722,11 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
26982722

26992723
try {
27002724
invokeOnMessage(record);
2701-
successTimer(sample);
2725+
successTimer(sample, record);
27022726
recordInterceptAfter(record, null);
27032727
}
27042728
catch (RuntimeException e) {
2705-
failureTimer(sample);
2729+
failureTimer(sample, record);
27062730
recordInterceptAfter(record, e);
27072731
if (this.commonErrorHandler == null) {
27082732
throw e;

0 commit comments

Comments
 (0)