diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index b655eb5b49..a859cce0bb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2357,13 +2357,18 @@ private void successTimer(@Nullable Object sample, @Nullable ConsumerRecord record) { + private void failureTimer(@Nullable Object sample, @Nullable ConsumerRecord record, + Throwable exception) { if (sample != null) { + String exceptionName = exception.getCause() != null + ? exception.getCause().getClass().getSimpleName() + : exception.getClass().getSimpleName(); + if (this.micrometerTagsProvider == null || record == null) { - this.micrometerHolder.failure(sample, "ListenerExecutionFailedException"); + this.micrometerHolder.failure(sample, exceptionName); } else { - this.micrometerHolder.failure(sample, "ListenerExecutionFailedException", record); + this.micrometerHolder.failure(sample, exceptionName, record); } } } @@ -2441,7 +2446,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords r } catch (RuntimeException e) { this.batchFailed = true; - failureTimer(sample, null); + failureTimer(sample, null, e); batchInterceptAfter(records, e); throw e; } @@ -2776,7 +2781,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco recordInterceptAfter(cRecord, null); } catch (RuntimeException e) { - failureTimer(sample, cRecord); + failureTimer(sample, cRecord, e); recordInterceptAfter(cRecord, e); if (!isListenerAdapterObservationAware()) { observation.error(e); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 59982984cb..68c6d612f2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -425,7 +425,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C } catch (ListenerExecutionFailedException e) { listenerError = e; - currentObservation.error(e); + currentObservation.error(e.getCause() != null ? e.getCause() : e); handleException(records, acknowledgment, consumer, message, e); } catch (Error e) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java new file mode 100644 index 0000000000..3e7c61d465 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerMetricsTests.java @@ -0,0 +1,194 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.observation.ObservationRegistry; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Soby Chacko + * @since 3.2.7 + */ +@SpringJUnitConfig +@EmbeddedKafka(topics = { MicrometerMetricsTests.METRICS_TEST_TOPIC }, partitions = 1) +@DirtiesContext +public class MicrometerMetricsTests { + + public final static String METRICS_TEST_TOPIC = "metrics.test.topic"; + + @Test + void verifyMetricsWithoutObservation(@Autowired MetricsListener listener, + @Autowired MeterRegistry meterRegistry, + @Autowired KafkaTemplate template) + throws Exception { + + template.send(METRICS_TEST_TOPIC, "test").get(10, TimeUnit.SECONDS); + assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + + Timer timer = meterRegistry.find("spring.kafka.listener") + .tags("name", "metricsTest-0") + .tag("result", "failure") + .timer(); + + assertThat(timer).isNotNull(); + assertThat(timer.getId().getTag("exception")) + .isEqualTo("IllegalStateException"); + } + + @Test + void verifyMetricsWithObservation(@Autowired ObservationListener observationListener, + @Autowired MeterRegistry meterRegistry, + @Autowired KafkaTemplate template) + throws Exception { + + template.send(METRICS_TEST_TOPIC, "test").get(10, TimeUnit.SECONDS); + assertThat(observationListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + + Timer timer = meterRegistry.find("spring.kafka.listener") + .tag("spring.kafka.listener.id", "observationTest-0") + .tag("error", "IllegalStateException") + .timer(); + + assertThat(timer).isNotNull(); + } + + @Configuration + @EnableKafka + static class Config { + + @Bean + ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { + return new DefaultKafkaProducerFactory<>( + KafkaTestUtils.producerProps(broker)); + } + + @Bean + ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { + return new DefaultKafkaConsumerFactory<>( + KafkaTestUtils.consumerProps("metrics", "false", broker)); + } + + @Bean + KafkaTemplate template(ProducerFactory pf) { + return new KafkaTemplate<>(pf); + } + + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory cf) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(cf); + factory.getContainerProperties().setMicrometerEnabled(true); + factory.getContainerProperties().setObservationEnabled(false); + return factory; + } + + @Bean + ConcurrentKafkaListenerContainerFactory observationListenerContainerFactory( + ConsumerFactory cf, ObservationRegistry observationRegistry) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(cf); + factory.getContainerProperties().setObservationEnabled(true); + factory.getContainerProperties().setObservationRegistry(observationRegistry); + return factory; + } + + @Bean + MetricsListener metricsListener() { + return new MetricsListener(); + } + + @Bean + MeterRegistry meterRegistry() { + return new SimpleMeterRegistry(); + } + + @Bean + ObservationListener observationListener() { + return new ObservationListener(); + } + + @Bean + ObservationRegistry observationRegistry(MeterRegistry meterRegistry) { + ObservationRegistry observationRegistry = ObservationRegistry.create(); + observationRegistry.observationConfig() + .observationHandler(new DefaultMeterObservationHandler(meterRegistry)); + return observationRegistry; + } + } + + static class MetricsListener { + final CountDownLatch latch = new CountDownLatch(1); + + @KafkaListener(id = "metricsTest", topics = METRICS_TEST_TOPIC) + void listen(ConsumerRecord in) { + try { + throw new IllegalStateException("metrics test exception"); + } + finally { + latch.countDown(); + } + } + } + + static class ObservationListener { + final CountDownLatch latch = new CountDownLatch(1); + + @KafkaListener(id = "observationTest", + topics = METRICS_TEST_TOPIC, + containerFactory = "observationListenerContainerFactory") + void listen(ConsumerRecord in) { + try { + throw new IllegalStateException("observation test exception"); + } + finally { + latch.countDown(); + } + } + } + +} + diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 5177ba3823..99442dd2ba 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -359,8 +359,9 @@ private void assertThatAdmin(Object object, KafkaAdmin admin, String brokersStri @Test void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, @Autowired @Qualifier("throwableTemplate") KafkaTemplate runtimeExceptionTemplate, - @Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired Config config) - throws ExecutionException, InterruptedException, TimeoutException { + @Autowired KafkaListenerEndpointRegistry endpointRegistry, + @Autowired MeterRegistry meterRegistry, @Autowired Config config) + throws ExecutionException, InterruptedException, TimeoutException { runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS); assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue(); @@ -372,10 +373,14 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate"); span = spans.poll(); assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0"); - assertThat(span.getError().getCause()) + assertThat(span.getError()) .isInstanceOf(IllegalStateException.class) .hasMessage("obs4 run time exception"); + assertThat(meterRegistry.get("spring.kafka.listener") + .tag("error", "IllegalStateException") + .timer().count()).isEqualTo(1); + assertThat(config.scopeInFailureReference.get()).isNotNull(); }