From 446b04078696b4559bfe805341e0b6f336f77c86 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Sat, 15 Feb 2025 20:30:20 -0500 Subject: [PATCH 1/2] GH-3741: Fix metric tag to show underlying exception type Fixes: #3741 Issue: https://github.com/spring-projects/spring-kafka/issues/3741 When exceptions occur in Kafka listeners, the metrics currently show `ListenerExecutionFailedException` in both the `error` tag (when using observation) and `exception` tag (when using micrometer without observation), rather than the actual underlying exception. * Modify ListenerContainer to pass actual exception to failure metrics * Update MessagingMessageListenerAdapter to report cause to observation * Add MicrometerMetricsTests to verify both observation and non-observation metrics * Fix ObservationTests to verify correct error reporting in metrics This ensures metrics show the actual underlying exception while maintaining existing span behavior. Signed-off-by: Soby Chacko **Auto-cherry-pick to `3.3.x` & `3.2.x`** --- .../KafkaMessageListenerContainer.java | 15 +- .../MessagingMessageListenerAdapter.java | 2 +- .../micrometer/MicrometerMetricsTests.java | 210 ++++++++++++++++++ .../support/micrometer/ObservationTests.java | 11 +- 4 files changed, 229 insertions(+), 9 deletions(-) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index b655eb5b49..a859cce0bb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2357,13 +2357,18 @@ private void successTimer(@Nullable Object sample, @Nullable ConsumerRecord record) { + private void failureTimer(@Nullable Object sample, @Nullable ConsumerRecord record, + Throwable exception) { if (sample != null) { + String exceptionName = exception.getCause() != null + ? exception.getCause().getClass().getSimpleName() + : exception.getClass().getSimpleName(); + if (this.micrometerTagsProvider == null || record == null) { - this.micrometerHolder.failure(sample, "ListenerExecutionFailedException"); + this.micrometerHolder.failure(sample, exceptionName); } else { - this.micrometerHolder.failure(sample, "ListenerExecutionFailedException", record); + this.micrometerHolder.failure(sample, exceptionName, record); } } } @@ -2441,7 +2446,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords r } catch (RuntimeException e) { this.batchFailed = true; - failureTimer(sample, null); + failureTimer(sample, null, e); batchInterceptAfter(records, e); throw e; } @@ -2776,7 +2781,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco recordInterceptAfter(cRecord, null); } catch (RuntimeException e) { - failureTimer(sample, cRecord); + failureTimer(sample, cRecord, e); recordInterceptAfter(cRecord, e); if (!isListenerAdapterObservationAware()) { observation.error(e); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 59982984cb..68c6d612f2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -425,7 +425,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C } catch (ListenerExecutionFailedException e) { listenerError = e; - currentObservation.error(e); + currentObservation.error(e.getCause() != null ? e.getCause() : e); handleException(records, acknowledgment, consumer, message, e); } catch (Error e) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java new file mode 100644 index 0000000000..b42df3ff02 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java @@ -0,0 +1,210 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.tracing.test.simple.SimpleTracer; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Soby Chacko + * @since 3.3.3 + */ +@SpringJUnitConfig +@EmbeddedKafka(topics = { MicrometerMetricsTests.METRICS_TEST_TOPIC }, partitions = 1) +@DirtiesContext +public class MicrometerMetricsTests { + + public final static String METRICS_TEST_TOPIC = "metrics.test.topic"; + + @Test + void verifyMetricsWithoutObservation(@Autowired MetricsListener listener, + @Autowired MeterRegistry meterRegistry, + @Autowired KafkaTemplate template) + throws Exception { + + template.send(METRICS_TEST_TOPIC, "test").get(10, TimeUnit.SECONDS); + assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + + Timer timer = meterRegistry.find("spring.kafka.listener") + .tags("name", "metricsTest-0") + .tag("result", "failure") + .timer(); + + assertThat(timer).isNotNull(); + assertThat(timer.getId().getTag("exception")) + .isEqualTo("IllegalStateException"); + } + + @Test + void verifyMetricsWithObservation(@Autowired ObservationListener observationListener, + @Autowired MeterRegistry meterRegistry, + @Autowired KafkaTemplate template) + throws Exception { + + template.send(METRICS_TEST_TOPIC, "test").get(10, TimeUnit.SECONDS); + assertThat(observationListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + + Timer timer = meterRegistry.find("spring.kafka.listener") + .tag("spring.kafka.listener.id", "observationTest-0") + .tag("error", "IllegalStateException") + .timer(); + + assertThat(timer).isNotNull(); + } + + @Configuration + @EnableKafka + static class Config { + + @Bean + KafkaAdmin admin(EmbeddedKafkaBroker broker) { + return new KafkaAdmin(Map.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, + broker.getBrokersAsString())); + } + + @Bean + ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { + return new DefaultKafkaProducerFactory<>( + KafkaTestUtils.producerProps(broker)); + } + + @Bean + ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { + return new DefaultKafkaConsumerFactory<>( + KafkaTestUtils.consumerProps("metrics", "false", broker)); + } + + @Bean + KafkaTemplate template(ProducerFactory pf) { + return new KafkaTemplate<>(pf); + } + + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory cf) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(cf); + factory.getContainerProperties().setMicrometerEnabled(true); + factory.getContainerProperties().setObservationEnabled(false); + return factory; + } + + @Bean + ConcurrentKafkaListenerContainerFactory observationListenerContainerFactory( + ConsumerFactory cf, ObservationRegistry observationRegistry) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(cf); + factory.getContainerProperties().setObservationEnabled(true); + factory.getContainerProperties().setObservationRegistry(observationRegistry); + return factory; + } + + @Bean + MetricsListener metricsListener() { + return new MetricsListener(); + } + + @Bean + MeterRegistry meterRegistry() { + return new SimpleMeterRegistry(); + } + + @Bean + ObservationListener observationListener() { + return new ObservationListener(); + } + + @Bean + SimpleTracer simpleTracer() { + return new SimpleTracer(); + } + + @Bean + ObservationRegistry observationRegistry(MeterRegistry meterRegistry) { + ObservationRegistry observationRegistry = ObservationRegistry.create(); + observationRegistry.observationConfig() + .observationHandler(new DefaultMeterObservationHandler(meterRegistry)); + return observationRegistry; + } + } + + static class MetricsListener { + final CountDownLatch latch = new CountDownLatch(1); + + @KafkaListener(id = "metricsTest", topics = METRICS_TEST_TOPIC) + void listen(ConsumerRecord in) { + try { + throw new IllegalStateException("metrics test exception"); + } + finally { + latch.countDown(); + } + } + } + + static class ObservationListener { + final CountDownLatch latch = new CountDownLatch(1); + + @KafkaListener(id = "observationTest", + topics = METRICS_TEST_TOPIC, + containerFactory = "observationListenerContainerFactory") + void listen(ConsumerRecord in) { + try { + throw new IllegalStateException("observation test exception"); + } + finally { + latch.countDown(); + } + } + } + +} + diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 5177ba3823..99442dd2ba 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -359,8 +359,9 @@ private void assertThatAdmin(Object object, KafkaAdmin admin, String brokersStri @Test void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, @Autowired @Qualifier("throwableTemplate") KafkaTemplate runtimeExceptionTemplate, - @Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired Config config) - throws ExecutionException, InterruptedException, TimeoutException { + @Autowired KafkaListenerEndpointRegistry endpointRegistry, + @Autowired MeterRegistry meterRegistry, @Autowired Config config) + throws ExecutionException, InterruptedException, TimeoutException { runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS); assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue(); @@ -372,10 +373,14 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate"); span = spans.poll(); assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0"); - assertThat(span.getError().getCause()) + assertThat(span.getError()) .isInstanceOf(IllegalStateException.class) .hasMessage("obs4 run time exception"); + assertThat(meterRegistry.get("spring.kafka.listener") + .tag("error", "IllegalStateException") + .timer().count()).isEqualTo(1); + assertThat(config.scopeInFailureReference.get()).isNotNull(); } From 07c2c579b220ee21e7412a902a0b2659450df95c Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 18 Feb 2025 09:45:44 -0500 Subject: [PATCH 2/2] Addressing PR review --- .../micrometer/MicrometerMetricsTests.java | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java index b42df3ff02..3e7c61d465 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java @@ -16,7 +16,6 @@ package org.springframework.kafka.support.micrometer; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -25,8 +24,6 @@ import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.observation.ObservationRegistry; -import io.micrometer.tracing.test.simple.SimpleTracer; -import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; @@ -39,7 +36,6 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -52,7 +48,7 @@ /** * @author Soby Chacko - * @since 3.3.3 + * @since 3.2.7 */ @SpringJUnitConfig @EmbeddedKafka(topics = { MicrometerMetricsTests.METRICS_TEST_TOPIC }, partitions = 1) @@ -101,13 +97,6 @@ void verifyMetricsWithObservation(@Autowired ObservationListener observationList @EnableKafka static class Config { - @Bean - KafkaAdmin admin(EmbeddedKafkaBroker broker) { - return new KafkaAdmin(Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, - broker.getBrokersAsString())); - } - @Bean ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { return new DefaultKafkaProducerFactory<>( @@ -162,11 +151,6 @@ ObservationListener observationListener() { return new ObservationListener(); } - @Bean - SimpleTracer simpleTracer() { - return new SimpleTracer(); - } - @Bean ObservationRegistry observationRegistry(MeterRegistry meterRegistry) { ObservationRegistry observationRegistry = ObservationRegistry.create();