From 92cb634a416bcd31cf36dbf751482ab448e50a25 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Thu, 29 Feb 2024 01:33:59 +0800 Subject: [PATCH 1/3] Listener exceptions are not saved to the observation. * Embedded the (original) exception into the observation, allowing downstream tracing code to handle it. * Add unit test for observation Error and RuntimeException. --- .../KafkaMessageListenerContainer.java | 52 ++++----- .../support/micrometer/ObservationTests.java | 104 ++++++++++++++++-- 2 files changed, 122 insertions(+), 34 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 14fcd2e3aa..d02faa72db 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2762,37 +2762,37 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco DefaultKafkaListenerObservationConvention.INSTANCE, () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId), this.observationRegistry); - return observation.observe(() -> { - try { + try { + observation.observe(() -> { invokeOnMessage(cRecord); successTimer(sample, cRecord); recordInterceptAfter(cRecord, null); + }); + } + catch (RuntimeException e) { + failureTimer(sample, cRecord); + recordInterceptAfter(cRecord, e); + if (this.commonErrorHandler == null) { + throw e; } - catch (RuntimeException e) { - failureTimer(sample, cRecord); - recordInterceptAfter(cRecord, e); - if (this.commonErrorHandler == null) { - throw e; - } - try { - invokeErrorHandler(cRecord, iterator, e); - commitOffsetsIfNeededAfterHandlingError(cRecord); - } - catch (KafkaException ke) { - ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); - return ke; - } - catch (RuntimeException ee) { - this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION); - return ee; - } - catch (Error er) { // NOSONAR - this.logger.error(er, "Error handler threw an error"); - throw er; - } + try { + invokeErrorHandler(cRecord, iterator, e); + commitOffsetsIfNeededAfterHandlingError(cRecord); } - return null; - }); + catch (KafkaException ke) { + ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); + return ke; + } + catch (RuntimeException ee) { + this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION); + return ee; + } + catch (Error er) { // NOSONAR + this.logger.error(er, "Error handler threw an error"); + throw er; + } + } + return null; } private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord cRecord) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index d67396aa8b..1d392b7e22 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.junit.jupiter.api.Test; @@ -54,7 +55,6 @@ import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention; import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -85,14 +85,20 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Wang Zhiyang * * @since 3.0 */ @SpringJUnitConfig -@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" }) +@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "observation.testT3", + ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR}) @DirtiesContext public class ObservationTests { + public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception"; + + public final static String OBSERVATION_ERROR = "observation.error"; + @Test void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate template, @Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler, @@ -106,8 +112,8 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate hdr.value()).isEqualTo("some foo value".getBytes()); - assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes()); + assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes()); + assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes()); Deque spans = tracer.getSpans(); assertThat(spans).hasSize(4); SimpleSpan span = spans.poll(); @@ -148,14 +154,15 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) } }); + rler.getListenerContainer("obs1").stop(); rler.getListenerContainer("obs1").start(); template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS); assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(listener.record).isNotNull(); headers = listener.record.headers(); - assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes()); - assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes()); + assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes()); + assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes()); assertThat(spans).hasSize(4); span = spans.poll(); assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); @@ -230,6 +237,48 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) .doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException")); } + @Test + void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, + @Autowired @Qualifier("runtimeExceptionTemplate") KafkaTemplate runtimeExceptionTemplate, + @Autowired KafkaListenerEndpointRegistry endpointRegistry) + throws ExecutionException, InterruptedException, TimeoutException { + + runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS); + assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue(); + endpointRegistry.getListenerContainer("obs4").stop(); + + Deque spans = tracer.getSpans(); + assertThat(spans).hasSize(2); + SimpleSpan span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("runtimeExceptionTemplate"); + span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0"); + assertThat(span.getError().getCause().getCause()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("obs4 run time exception"); + } + + @Test + void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, + @Autowired @Qualifier("errorTemplate") KafkaTemplate errorTemplate, + @Autowired KafkaListenerEndpointRegistry endpointRegistry) + throws ExecutionException, InterruptedException, TimeoutException { + + errorTemplate.send(OBSERVATION_ERROR, "testError").get(10, TimeUnit.SECONDS); + assertThat(listener.latch5.await(10, TimeUnit.SECONDS)).isTrue(); + endpointRegistry.getListenerContainer("obs5").stop(); + + Deque spans = tracer.getSpans(); + assertThat(spans).hasSize(2); + SimpleSpan span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("errorTemplate"); + span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs5-0"); + assertThat(span.getError()) + .isInstanceOf(Error.class) + .hasMessage("obs5 error"); + } + @Configuration @EnableKafka public static class Config { @@ -276,6 +325,20 @@ KafkaTemplate customTemplate(ProducerFactory p return template; } + @Bean + KafkaTemplate runtimeExceptionTemplate(ProducerFactory pf) { + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setObservationEnabled(true); + return template; + } + + @Bean + KafkaTemplate errorTemplate(ProducerFactory pf) { + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setObservationEnabled(true); + return template; + } + @Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory cf) { @@ -286,7 +349,7 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerF factory.getContainerProperties().setObservationEnabled(true); factory.setContainerCustomizer(container -> { if (container.getListenerId().equals("obs3")) { - ((AbstractMessageListenerContainer) container).setKafkaAdmin(this.mockAdmin); + container.setKafkaAdmin(this.mockAdmin); } }); return factory; @@ -352,6 +415,11 @@ Listener listener(KafkaTemplate template) { return new Listener(template); } + @Bean + ExceptionListener exceptionListener() { + return new ExceptionListener(); + } + } public static class Listener { @@ -387,4 +455,24 @@ void listen3(ConsumerRecord in) { } + public static class ExceptionListener { + + final CountDownLatch latch4 = new CountDownLatch(1); + + final CountDownLatch latch5 = new CountDownLatch(1); + + @KafkaListener(id = "obs4", topics = OBSERVATION_RUNTIME_EXCEPTION) + void listenRuntimeException(ConsumerRecord in) { + this.latch4.countDown(); + throw new IllegalStateException("obs4 run time exception"); + } + + @KafkaListener(id = "obs5", topics = OBSERVATION_ERROR) + void listenError(ConsumerRecord in) { + this.latch5.countDown(); + throw new Error("obs5 error"); + } + + } + } From 23a0eda0595062d90e43cc421cc6528152d7bfd4 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Thu, 29 Feb 2024 02:41:35 +0800 Subject: [PATCH 2/3] Miss the commit. --- .../kafka/support/micrometer/ObservationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 1d392b7e22..e1d74f81a9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -253,7 +253,7 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("runtimeExceptionTemplate"); span = spans.poll(); assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0"); - assertThat(span.getError().getCause().getCause()) + assertThat(span.getError().getCause()) .isInstanceOf(IllegalStateException.class) .hasMessage("obs4 run time exception"); } From dbad9756ec79930cb1342239e3b94fa0da343e36 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Thu, 29 Feb 2024 10:07:50 +0800 Subject: [PATCH 3/3] Unify the runtimeExceptionTemplate and errorTemplate into a throwableTemplate. --- .../support/micrometer/ObservationTests.java | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index e1d74f81a9..b5caf4e51e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -239,7 +239,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) @Test void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, - @Autowired @Qualifier("runtimeExceptionTemplate") KafkaTemplate runtimeExceptionTemplate, + @Autowired @Qualifier("throwableTemplate") KafkaTemplate runtimeExceptionTemplate, @Autowired KafkaListenerEndpointRegistry endpointRegistry) throws ExecutionException, InterruptedException, TimeoutException { @@ -250,7 +250,7 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir Deque spans = tracer.getSpans(); assertThat(spans).hasSize(2); SimpleSpan span = spans.poll(); - assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("runtimeExceptionTemplate"); + assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate"); span = spans.poll(); assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0"); assertThat(span.getError().getCause()) @@ -260,7 +260,7 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir @Test void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, - @Autowired @Qualifier("errorTemplate") KafkaTemplate errorTemplate, + @Autowired @Qualifier("throwableTemplate") KafkaTemplate errorTemplate, @Autowired KafkaListenerEndpointRegistry endpointRegistry) throws ExecutionException, InterruptedException, TimeoutException { @@ -271,7 +271,7 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired Deque spans = tracer.getSpans(); assertThat(spans).hasSize(2); SimpleSpan span = spans.poll(); - assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("errorTemplate"); + assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate"); span = spans.poll(); assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs5-0"); assertThat(span.getError()) @@ -326,14 +326,7 @@ KafkaTemplate customTemplate(ProducerFactory p } @Bean - KafkaTemplate runtimeExceptionTemplate(ProducerFactory pf) { - KafkaTemplate template = new KafkaTemplate<>(pf); - template.setObservationEnabled(true); - return template; - } - - @Bean - KafkaTemplate errorTemplate(ProducerFactory pf) { + KafkaTemplate throwableTemplate(ProducerFactory pf) { KafkaTemplate template = new KafkaTemplate<>(pf); template.setObservationEnabled(true); return template;