diff --git a/ci/publish-documentation-to-github-pages.sh b/ci/publish-documentation-to-github-pages.sh index f1197cb5e5..074ae4efcf 100755 --- a/ci/publish-documentation-to-github-pages.sh +++ b/ci/publish-documentation-to-github-pages.sh @@ -2,8 +2,13 @@ . $(pwd)/release-versions.txt +./mvnw clean test-compile exec:java \ + -Dexec.mainClass=io.micrometer.docs.DocsGeneratorCommand \ + -Dexec.classpathScope="test" \ + -Dexec.args='src/main/java/com/rabbitmq/stream/observation/micrometer .* target/micrometer-observation-docs' + MESSAGE=$(git log -1 --pretty=%B) -./mvnw clean buildnumber:create pre-site --no-transfer-progress +./mvnw buildnumber:create pre-site --no-transfer-progress ./mvnw javadoc:javadoc -Dmaven.javadoc.skip=false --no-transfer-progress diff --git a/pom.xml b/pom.xml index 445ede3a78..913753754b 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,8 @@ 2.10.1 0.10.4 1.2.5 + 1.1.3 + 1.0.2 3.11.0 3.1.2 2.7.6 @@ -281,7 +283,7 @@ org.eclipse.paho org.eclipse.paho.client.mqttv3 ${paho.version} - test + test @@ -312,6 +314,20 @@ test + + io.micrometer + micrometer-tracing-integration-test + ${micrometer-tracing-test.version} + test + + + + io.micrometer + micrometer-docs-generator + ${micrometer-docs-generator.version} + test + + org.openjdk.jmh jmh-core @@ -393,6 +409,9 @@ org.apache.maven.plugins maven-resources-plugin ${maven-resources-plugin.version} + + ${project.build.sourceEncoding} + @@ -457,6 +476,7 @@ - coderay ../../test/java/com/rabbitmq/stream/docs + ${project.build.directory} diff --git a/src/docs/asciidoc/appendixes.adoc b/src/docs/asciidoc/appendixes.adoc new file mode 100644 index 0000000000..2cbda3e8c4 --- /dev/null +++ b/src/docs/asciidoc/appendixes.adoc @@ -0,0 +1,26 @@ +ifndef::build-directory[:build-directory: ../../../target] +:test-examples: ../../test/java/com/rabbitmq/stream/docs + +[appendix] +== Micrometer Observation + +It is possible to use https://micrometer.io/docs/observation[Micrometer Observation] to instrument publishing and consuming in the stream Java client. +Micrometer Observation provides https://spring.io/blog/2022/10/12/observability-with-spring-boot-3[metrics, tracing, and log correlation with one single API]. + +The stream Java client provides an `ObservationCollector` abstraction and an implementation for Micrometer Observation. +The following snippet shows how to create and set up the Micrometer `ObservationCollector` implementation with an existing `ObservationRegistry`: + +.Configuring Micrometer Observation +[source,java,indent=0] +-------- +include::{test-examples}/EnvironmentUsage.java[tag=micrometer-observation] +-------- +<1> Configure Micrometer `ObservationCollector` with builder +<2> Set Micrometer `ObservationRegistry` + +The next sections document the conventions, spans, and metrics made available by the instrumentation. +They are automatically generated from the source code with the https://github.com/micrometer-metrics/micrometer-docs-generator[Micrometer documentation generator]. + +include::{build-directory}/micrometer-observation-docs/_conventions.adoc[] +include::{build-directory}/micrometer-observation-docs/_spans.adoc[] +include::{build-directory}/micrometer-observation-docs/_metrics.adoc[] \ No newline at end of file diff --git a/src/docs/asciidoc/index.adoc b/src/docs/asciidoc/index.adoc index 38bf91df28..0ff03dd549 100644 --- a/src/docs/asciidoc/index.adoc +++ b/src/docs/asciidoc/index.adoc @@ -1,6 +1,7 @@ = RabbitMQ Stream Java Client :revnumber: {project-version} :revremark: ({build-number}) +:appendix-caption: Appendix ifndef::imagesdir[:imagesdir: images] ifndef::sourcedir[:sourcedir: ../../main/java] :source-highlighter: prettify @@ -28,4 +29,6 @@ include::advanced-topics.adoc[] include::building.adoc[] -include::performance-tool.adoc[] \ No newline at end of file +include::performance-tool.adoc[] + +include::appendixes.adoc[] \ No newline at end of file diff --git a/src/main/java/com/rabbitmq/stream/Codec.java b/src/main/java/com/rabbitmq/stream/Codec.java index 1eb5c88702..1eff41fc50 100644 --- a/src/main/java/com/rabbitmq/stream/Codec.java +++ b/src/main/java/com/rabbitmq/stream/Codec.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java index b6845ca73c..f4a730046a 100644 --- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java @@ -222,6 +222,8 @@ public interface EnvironmentBuilder { */ EnvironmentBuilder metricsCollector(MetricsCollector metricsCollector); + EnvironmentBuilder observationCollector(ObservationCollector observationCollector); + /** * The maximum number of producers allocated to a single connection. * diff --git a/src/main/java/com/rabbitmq/stream/Message.java b/src/main/java/com/rabbitmq/stream/Message.java index 6651a87c89..296007fc21 100644 --- a/src/main/java/com/rabbitmq/stream/Message.java +++ b/src/main/java/com/rabbitmq/stream/Message.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -85,4 +85,31 @@ public interface Message { * @return the message annotations */ Map getMessageAnnotations(); + + /** + * Add a message annotation to the message. + * + * @param key the message annotation key + * @param value the message annotation value + * @return the modified message + * @since 0.12.0 + */ + default Message annotate(String key, Object value) { + this.getMessageAnnotations().put(key, value); + return this; + } + + /** + * Create a copy of the message. + * + *

The message copy contains the exact same instances of the original bare message (body, + * properties, application properties), only the message annotations are actually copied and can + * be modified independently. + * + * @return the message copy + * @since 0.12.0 + */ + default Message copy() { + return this; + } } diff --git a/src/main/java/com/rabbitmq/stream/ObservationCollector.java b/src/main/java/com/rabbitmq/stream/ObservationCollector.java new file mode 100644 index 0000000000..e10c242de8 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/ObservationCollector.java @@ -0,0 +1,83 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream; + +/** + * API to instrument operations in the stream client. The supported operations are publishing, and + * asynchronous delivery. + * + *

Implementations can gather information and send it to tracing backends. This allows e.g. + * following the processing steps of a given message through different systems. + * + *

This is considered an SPI and is susceptible to change at any time. + * + * @since 0.12.0 + * @see EnvironmentBuilder#observationCollector(ObservationCollector) + * @see com.rabbitmq.stream.observation.micrometer.MicrometerObservationCollectorBuilder + */ +public interface ObservationCollector { + + ObservationCollector NO_OP = + new ObservationCollector() { + @Override + public Void prePublish(String stream, Message message) { + return null; + } + + @Override + public void published(Void context, Message message) {} + + @Override + public MessageHandler subscribe(MessageHandler handler) { + return handler; + } + }; + + /** + * Start observation. + * + *

Implementations are expecting to return an observation context that will be passed in to the + * {@link #published(Object, Message)} callback. + * + * @param stream the stream to publish to + * @param message the message to publish + * @return observation context + */ + T prePublish(String stream, Message message); + + /** + * Callback when the message is about to be published. + * + * @param context the observation context + * @param message the message to publish + */ + void published(T context, Message message); + + /** + * Decorate consumer registration. + * + * @param handler the original handler + * @return a decorated handler + */ + MessageHandler subscribe(MessageHandler handler); + + /** + * Says whether the implementation does nothing or not. + * + * @return true if the implementation is a no-op + */ + default boolean isNoop() { + return this == NO_OP; + } +} diff --git a/src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java b/src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java index 094c619341..3a316cf279 100644 --- a/src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java +++ b/src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -32,6 +32,7 @@ import org.apache.qpid.proton.codec.WritableBuffer; public class QpidProtonCodec implements Codec { + private static final Function MESSAGE_ANNOTATIONS_STRING_KEY_EXTRACTOR = k -> k; private static final Function MESSAGE_ANNOTATIONS_SYMBOL_KEY_EXTRACTOR = Symbol::toString; @@ -52,7 +53,7 @@ private static Map createMessageAnnotations( return createMapFromAmqpMap( MESSAGE_ANNOTATIONS_SYMBOL_KEY_EXTRACTOR, message.getMessageAnnotations().getValue()); } else { - return null; + return new LinkedHashMap<>(); } } @@ -258,7 +259,7 @@ public Message decode(byte[] data) { createMessageAnnotations(message)); } - protected Properties createProperties(org.apache.qpid.proton.message.Message message) { + protected static Properties createProperties(org.apache.qpid.proton.message.Message message) { if (message.getProperties() != null) { return new QpidProtonProperties(message.getProperties()); } else { @@ -504,6 +505,21 @@ public Map getApplicationProperties() { public Map getMessageAnnotations() { return messageAnnotations; } + + @Override + public Message annotate(String key, Object value) { + this.messageAnnotations.put(key, value); + return this; + } + + @Override + public Message copy() { + return new QpidProtonMessage( + message, + createProperties(message), + createApplicationProperties(message), + createMessageAnnotations(message)); + } } static class QpidProtonAmqpMessageWrapper implements Message { @@ -579,6 +595,38 @@ public Map getMessageAnnotations() { return null; } } + + @Override + public Message annotate(String key, Object value) { + MessageAnnotations annotations = this.message.getMessageAnnotations(); + if (annotations == null) { + annotations = new MessageAnnotations(new LinkedHashMap<>()); + this.message.setMessageAnnotations(annotations); + } + annotations.getValue().put(Symbol.getSymbol(key), value); + return this; + } + + @Override + public Message copy() { + org.apache.qpid.proton.message.Message copy = + org.apache.qpid.proton.message.Message.Factory.create(); + copy.setProperties(this.message.getProperties()); + copy.setBody(this.message.getBody()); + copy.setApplicationProperties(this.message.getApplicationProperties()); + if (this.message.getMessageAnnotations() != null) { + Map annotations = message.getMessageAnnotations().getValue(); + Map annotationCopy; + if (annotations == null) { + annotationCopy = null; + } else { + annotationCopy = new LinkedHashMap<>(annotations.size()); + annotationCopy.putAll(annotations); + } + copy.setMessageAnnotations(new MessageAnnotations(annotationCopy)); + } + return new QpidProtonAmqpMessageWrapper(this.hasPublishingId, this.publishingId, copy); + } } // from diff --git a/src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java b/src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java index c94b2b727b..445060d608 100644 --- a/src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java +++ b/src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). diff --git a/src/main/java/com/rabbitmq/stream/codec/SimpleCodec.java b/src/main/java/com/rabbitmq/stream/codec/SimpleCodec.java index fba102f34d..7c45a63497 100644 --- a/src/main/java/com/rabbitmq/stream/codec/SimpleCodec.java +++ b/src/main/java/com/rabbitmq/stream/codec/SimpleCodec.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). diff --git a/src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java b/src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java index 6b630514e8..748c2d7044 100644 --- a/src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java +++ b/src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -276,11 +276,11 @@ public EncodedMessage encode(Message message) { outboundMessage.writeContent(output); return new EncodedMessage(output.getCount(), output.getBuffer()); } catch (IOException e) { - throw new StreamException("Error while writing AMQP 1.0 message to output stream"); + throw new StreamException("Error while writing AMQP 1.0 message to output stream", e); } } - protected AMQPType convertToSwiftMqType(Object value) { + protected static AMQPType convertToSwiftMqType(Object value) { if (value instanceof Boolean) { return ((Boolean) value).booleanValue() ? AMQPBoolean.TRUE : AMQPBoolean.FALSE; } else if (value instanceof Byte) { @@ -355,7 +355,7 @@ protected Message createMessage(byte[] data) { try { amqpMessage = new AMQPMessage(data); } catch (Exception e) { - throw new StreamException("Error while decoding AMQP 1.0 message"); + throw new StreamException("Error while decoding AMQP 1.0 message", e); } Object body = extractBody(amqpMessage); @@ -648,5 +648,47 @@ public Map getMessageAnnotations() { return null; } } + + @Override + public Message annotate(String key, Object value) { + MessageAnnotations annotations = this.message.getMessageAnnotations(); + Map map; + try { + if (annotations == null) { + map = new LinkedHashMap<>(); + annotations = new MessageAnnotations(map); + this.message.setMessageAnnotations(annotations); + } else { + map = annotations.getValue(); + } + map.put(new AMQPSymbol(key), convertToSwiftMqType(value)); + annotations.setValue(map); + } catch (IOException e) { + throw new StreamException("Error while annotating SwiftMQ message", e); + } + return this; + } + + @Override + public Message copy() { + AMQPMessage copy = new AMQPMessage(); + copy.setProperties(this.message.getProperties()); + if (this.message.getData() != null) { + this.message.getData().forEach(copy::addData); + } + copy.setApplicationProperties(this.message.getApplicationProperties()); + MessageAnnotations annotations = this.message.getMessageAnnotations(); + if (annotations != null) { + Map annotationCopy = null; + try { + annotationCopy = new LinkedHashMap<>(annotations.getValue().size()); + annotationCopy.putAll(annotations.getValue()); + copy.setMessageAnnotations(new MessageAnnotations(annotationCopy)); + } catch (IOException e) { + throw new StreamException("Error while copying SwiftMQ message annotations", e); + } + } + return new SwiftMqAmqpMessageWrapper(this.hasPublishingId, this.publishingId, copy); + } } } diff --git a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java index e40f6ada11..7be47f45e6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -30,12 +30,14 @@ interface AccumulatedEntity { long time(); - long publishindId(); + long publishingId(); String filterValue(); Object encodedEntity(); StreamProducer.ConfirmationCallback confirmationCallback(); + + Object observationContext(); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java index 5f01ea1416..b8d68eb920 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -13,11 +13,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import com.rabbitmq.stream.Codec; -import com.rabbitmq.stream.ConfirmationHandler; -import com.rabbitmq.stream.ConfirmationStatus; -import com.rabbitmq.stream.Message; -import com.rabbitmq.stream.StreamException; +import com.rabbitmq.stream.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -31,18 +27,23 @@ class SimpleMessageAccumulator implements MessageAccumulator { protected final BlockingQueue messages; protected final Clock clock; private final int capacity; - private final Codec codec; + protected final Codec codec; private final int maxFrameSize; private final ToLongFunction publishSequenceFunction; private final Function filterValueExtractor; + final String stream; + final ObservationCollector observationCollector; + @SuppressWarnings("unchecked") SimpleMessageAccumulator( int capacity, Codec codec, int maxFrameSize, ToLongFunction publishSequenceFunction, Function filterValueExtractor, - Clock clock) { + Clock clock, + String stream, + ObservationCollector observationCollector) { this.capacity = capacity; this.messages = new LinkedBlockingQueue<>(capacity); this.codec = codec; @@ -51,9 +52,12 @@ class SimpleMessageAccumulator implements MessageAccumulator { this.filterValueExtractor = filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor; this.clock = clock; + this.stream = stream; + this.observationCollector = (ObservationCollector) observationCollector; } public boolean add(Message message, ConfirmationHandler confirmationHandler) { + Object observationContext = this.observationCollector.prePublish(this.stream, message); Codec.EncodedMessage encodedMessage = this.codec.encode(message); Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage); long publishingId = this.publishSequenceFunction.applyAsLong(message); @@ -65,7 +69,8 @@ public boolean add(Message message, ConfirmationHandler confirmationHandler) { publishingId, this.filterValueExtractor.apply(message), encodedMessage, - new SimpleConfirmationCallback(message, confirmationHandler)), + new SimpleConfirmationCallback(message, confirmationHandler), + observationContext), 60, TimeUnit.SECONDS); if (!offered) { @@ -79,7 +84,12 @@ public boolean add(Message message, ConfirmationHandler confirmationHandler) { @Override public AccumulatedEntity get() { - return this.messages.poll(); + AccumulatedEntity entity = this.messages.poll(); + if (entity != null) { + this.observationCollector.published( + entity.observationContext(), entity.confirmationCallback().message()); + } + return entity; } @Override @@ -99,22 +109,25 @@ private static final class SimpleAccumulatedEntity implements AccumulatedEntity private final String filterValue; private final Codec.EncodedMessage encodedMessage; private final StreamProducer.ConfirmationCallback confirmationCallback; + private final Object observationContext; private SimpleAccumulatedEntity( long time, long publishingId, String filterValue, Codec.EncodedMessage encodedMessage, - StreamProducer.ConfirmationCallback confirmationCallback) { + StreamProducer.ConfirmationCallback confirmationCallback, + Object observationContext) { this.time = time; this.publishingId = publishingId; this.encodedMessage = encodedMessage; this.filterValue = filterValue; this.confirmationCallback = confirmationCallback; + this.observationContext = observationContext; } @Override - public long publishindId() { + public long publishingId() { return publishingId; } @@ -137,6 +150,11 @@ public long time() { public StreamProducer.ConfirmationCallback confirmationCallback() { return confirmationCallback; } + + @Override + public Object observationContext() { + return this.observationContext; + } } private static final class SimpleConfirmationCallback @@ -155,5 +173,10 @@ public int handle(boolean confirmed, short code) { confirmationHandler.handle(new ConfirmationStatus(message, confirmed, code)); return 1; } + + @Override + public Message message() { + return this.message; + } } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index 2d54e2fa95..b1a79ed8e0 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -219,6 +219,8 @@ public Consumer build() { }; } + handler = this.environment.observationCollector().subscribe(handler); + Consumer consumer; if (this.stream != null) { consumer = diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 75008c3eaf..aed2a97930 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -85,6 +85,7 @@ class StreamEnvironment implements Environment { private final Runnable locatorInitializationSequence; private final List locators = new CopyOnWriteArrayList<>(); private final ExecutorServiceFactory executorServiceFactory; + private final ObservationCollector observationCollector; StreamEnvironment( ScheduledExecutorService scheduledExecutorService, @@ -100,7 +101,8 @@ class StreamEnvironment implements Environment { ByteBufAllocator byteBufAllocator, boolean lazyInit, Function connectionNamingStrategy, - Function clientFactory) { + Function clientFactory, + ObservationCollector observationCollector) { this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy; this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy; this.byteBufAllocator = byteBufAllocator; @@ -108,6 +110,7 @@ class StreamEnvironment implements Environment { clientParametersPrototype = maybeSetUpClientParametersFromUris(uris, clientParametersPrototype); this.addressResolver = addressResolver; + this.observationCollector = observationCollector; boolean tls; if (tlsConfiguration != null && tlsConfiguration.enabled()) { @@ -643,6 +646,10 @@ CompressionCodecFactory compressionCodecFactory() { return this.clientParametersPrototype.compressionCodecFactory; } + ObservationCollector observationCollector() { + return this.observationCollector; + } + Runnable registerConsumer( StreamConsumer consumer, String stream, diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java index e087ebc64b..47ed8145d2 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java @@ -15,13 +15,7 @@ import static com.rabbitmq.stream.impl.Utils.noOpConsumer; -import com.rabbitmq.stream.AddressResolver; -import com.rabbitmq.stream.BackOffDelayPolicy; -import com.rabbitmq.stream.ChunkChecksum; -import com.rabbitmq.stream.Codec; -import com.rabbitmq.stream.Environment; -import com.rabbitmq.stream.EnvironmentBuilder; -import com.rabbitmq.stream.StreamException; +import com.rabbitmq.stream.*; import com.rabbitmq.stream.compression.CompressionCodecFactory; import com.rabbitmq.stream.impl.Utils.ClientConnectionType; import com.rabbitmq.stream.metrics.MetricsCollector; @@ -69,6 +63,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder { private CompressionCodecFactory compressionCodecFactory; private boolean lazyInit = false; private Function clientFactory = Client::new; + private ObservationCollector observationCollector = ObservationCollector.NO_OP; public StreamEnvironmentBuilder() {} @@ -286,6 +281,12 @@ StreamEnvironmentBuilder clientFactory(Function return this; } + @Override + public EnvironmentBuilder observationCollector(ObservationCollector observationCollector) { + this.observationCollector = observationCollector; + return this; + } + @Override public Environment build() { if (this.compressionCodecFactory == null) { @@ -300,6 +301,7 @@ public Environment build() { this.clientParameters.byteBufAllocator(this.netty.byteBufAllocator); this.clientParameters.channelCustomizer(this.netty.channelCustomizer); this.clientParameters.bootstrapCustomizer(this.netty.bootstrapCustomizer); + return new StreamEnvironment( scheduledExecutorService, clientParameters, @@ -314,7 +316,8 @@ public Environment build() { netty.byteBufAllocator, lazyInit, connectionNamingStrategy, - this.clientFactory); + this.clientFactory, + this.observationCollector); } static final class DefaultTlsConfiguration implements TlsConfiguration { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 84ff8a6296..d68af379a1 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -73,7 +73,7 @@ class StreamProducer implements Producer { private final int maxUnconfirmedMessages; private final Codec codec; private final ToLongFunction publishSequenceFunction = - entity -> ((AccumulatedEntity) entity).publishindId(); + entity -> ((AccumulatedEntity) entity).publishingId(); private final long enqueueTimeoutMs; private final boolean blockOnMaxUnconfirmed; private volatile Client client; @@ -122,7 +122,9 @@ class StreamProducer implements Producer { client.maxFrameSize(), accumulatorPublishSequenceFunction, filterValueExtractor, - this.environment.clock()); + this.environment.clock(), + stream, + this.environment.observationCollector()); if (filterValueExtractor == null) { delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK; } else { @@ -140,7 +142,9 @@ class StreamProducer implements Producer { this.environment.byteBufAllocator(), client.maxFrameSize(), accumulatorPublishSequenceFunction, - this.environment.clock()); + this.environment.clock(), + stream, + environment.observationCollector()); delegateWriteCallback = Client.OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK; } @@ -559,6 +563,8 @@ enum Status { interface ConfirmationCallback { int handle(boolean confirmed, short code); + + Message message(); } @Override diff --git a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java index 9533f2a922..0d2c56c8f9 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -16,6 +16,7 @@ import com.rabbitmq.stream.Codec; import com.rabbitmq.stream.Codec.EncodedMessage; import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.ObservationCollector; import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.compression.CompressionCodec; import com.rabbitmq.stream.impl.Client.EncodedMessageBatch; @@ -39,8 +40,18 @@ public SubEntryMessageAccumulator( ByteBufAllocator byteBufAllocator, int maxFrameSize, ToLongFunction publishSequenceFunction, - Clock clock) { - super(subEntrySize * batchSize, codec, maxFrameSize, publishSequenceFunction, null, clock); + Clock clock, + String stream, + ObservationCollector observationCollector) { + super( + subEntrySize * batchSize, + codec, + maxFrameSize, + publishSequenceFunction, + null, + clock, + stream, + observationCollector); this.subEntrySize = subEntrySize; this.compressionCodec = compressionCodec; this.compression = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); @@ -67,6 +78,8 @@ public AccumulatedEntity get() { if (message == null) { break; } + this.observationCollector.published( + message.observationContext(), message.confirmationCallback().message()); lastMessageInBatch = message; batch.add((EncodedMessage) message.encodedEntity(), message.confirmationCallback()); count++; @@ -75,7 +88,7 @@ public AccumulatedEntity get() { return null; } else { batch.time = lastMessageInBatch.time(); - batch.publishingId = lastMessageInBatch.publishindId(); + batch.publishingId = lastMessageInBatch.publishingId(); batch.encodedMessageBatch.close(); return batch; } @@ -107,7 +120,7 @@ boolean isEmpty() { } @Override - public long publishindId() { + public long publishingId() { return publishingId; } @@ -130,6 +143,12 @@ public long time() { public StreamProducer.ConfirmationCallback confirmationCallback() { return confirmationCallback; } + + @Override + public Object observationContext() { + throw new UnsupportedOperationException( + "batch entity does not contain only one observation context"); + } } private static class CompositeConfirmationCallback @@ -152,5 +171,11 @@ public int handle(boolean confirmed, short code) { } return callbacks.size(); } + + @Override + public Message message() { + throw new UnsupportedOperationException( + "composite confirmation callback does not contain just one message"); + } } } diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java index a3f51a106a..4783871318 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java @@ -15,14 +15,7 @@ import static com.rabbitmq.stream.impl.Utils.namedFunction; -import com.rabbitmq.stream.Codec; -import com.rabbitmq.stream.ConfirmationHandler; -import com.rabbitmq.stream.ConfirmationStatus; -import com.rabbitmq.stream.Constants; -import com.rabbitmq.stream.Message; -import com.rabbitmq.stream.MessageBuilder; -import com.rabbitmq.stream.Producer; -import com.rabbitmq.stream.RoutingStrategy; +import com.rabbitmq.stream.*; import com.rabbitmq.stream.RoutingStrategy.Metadata; import java.util.List; import java.util.Map; @@ -47,6 +40,13 @@ class SuperStreamProducer implements Producer { private final Metadata superStreamMetadata; private final AtomicBoolean closed = new AtomicBoolean(false); + /** + * passthrough, except when observation collector is not no-op and a message must be sent to + * several streams. In this it creates a copy of the message with distinct message annotations for + * each message, so that the collector can populate these messages annotations without collision + */ + private final MessageInterceptor messageInterceptor; + SuperStreamProducer( StreamProducerBuilder producerBuilder, String name, @@ -62,6 +62,10 @@ class SuperStreamProducer implements Producer { this.producerBuilder = producerBuilder.duplicate(); this.producerBuilder.stream(null); this.producerBuilder.resetRouting(); + this.messageInterceptor = + environment.observationCollector().isNoop() + ? (i, msg) -> msg + : (i, msg) -> i == 0 ? msg : msg.copy(); } @Override @@ -111,17 +115,12 @@ public void send(Message message, ConfirmationHandler confirmationHandler) { if (streams.isEmpty()) { confirmationHandler.handle( new ConfirmationStatus(message, false, Constants.CODE_NO_ROUTE_FOUND)); + } else if (streams.size() == 1) { + producer(streams.get(0)).send(message, confirmationHandler); } else { - for (String stream : streams) { - Producer producer = - producers.computeIfAbsent( - stream, - stream1 -> { - Producer p = - producerBuilder.duplicate().superStream(null).stream(stream1).build(); - return p; - }); - producer.send(message, confirmationHandler); + for (int i = 0; i < streams.size(); i++) { + Producer producer = producer(streams.get(i)); + producer(streams.get(i)).send(messageInterceptor.apply(i, message), confirmationHandler); } } } else { @@ -130,6 +129,15 @@ public void send(Message message, ConfirmationHandler confirmationHandler) { } } + private Producer producer(String stream) { + return producers.computeIfAbsent( + stream, + stream1 -> { + Producer p = producerBuilder.duplicate().superStream(null).stream(stream1).build(); + return p; + }); + } + private boolean canSend() { return !this.closed.get(); } @@ -191,4 +199,10 @@ public List route(String routingKey) { routingKey1))); } } + + @FunctionalInterface + private interface MessageInterceptor { + + Message apply(int partitionIndex, Message message); + } } diff --git a/src/main/java/com/rabbitmq/stream/observation/micrometer/DefaultProcessObservationConvention.java b/src/main/java/com/rabbitmq/stream/observation/micrometer/DefaultProcessObservationConvention.java new file mode 100644 index 0000000000..a41e9f4944 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/observation/micrometer/DefaultProcessObservationConvention.java @@ -0,0 +1,59 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.observation.micrometer; + +import static com.rabbitmq.stream.observation.micrometer.StreamObservationDocumentation.HighCardinalityTags.*; + +import com.rabbitmq.stream.observation.micrometer.StreamObservationDocumentation.LowCardinalityTags; +import io.micrometer.common.KeyValues; + +/** + * Default {@link ProcessObservationConvention}. + * + * @since 0.12.0 + */ +public class DefaultProcessObservationConvention implements ProcessObservationConvention { + + private static final String OPERATION = "process"; + private static final String OPERATION_SUFFIX = " " + OPERATION; + + @Override + public String getName() { + return "rabbitmq.stream.process"; + } + + @Override + public String getContextualName(ProcessContext context) { + return context.getStream() + OPERATION_SUFFIX; + } + + @Override + public KeyValues getLowCardinalityKeyValues(ProcessContext context) { + return KeyValues.of( + LowCardinalityTags.MESSAGING_OPERATION.withValue(OPERATION), + LowCardinalityTags.MESSAGING_SYSTEM.withValue("rabbitmq"), + LowCardinalityTags.NET_PROTOCOL_NAME.withValue("rabbitmq-stream"), + LowCardinalityTags.NET_PROTOCOL_VERSION.withValue("1.0")); + } + + @Override + public KeyValues getHighCardinalityKeyValues(ProcessContext context) { + // FIXME extract AMQP exchange and routing if present? + return KeyValues.of( + MESSAGING_DESTINATION_NAME.withValue(context.getStream()), + MESSAGING_SOURCE_NAME.withValue(context.getStream()), + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.withValue( + String.valueOf(context.getPayloadSizeBytes()))); + } +} diff --git a/src/main/java/com/rabbitmq/stream/observation/micrometer/DefaultPublishObservationConvention.java b/src/main/java/com/rabbitmq/stream/observation/micrometer/DefaultPublishObservationConvention.java new file mode 100644 index 0000000000..bba85cf802 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/observation/micrometer/DefaultPublishObservationConvention.java @@ -0,0 +1,58 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.observation.micrometer; + +import static com.rabbitmq.stream.observation.micrometer.StreamObservationDocumentation.HighCardinalityTags.*; + +import com.rabbitmq.stream.observation.micrometer.StreamObservationDocumentation.LowCardinalityTags; +import io.micrometer.common.KeyValues; + +/** + * Default {@link PublishObservationConvention}. + * + * @since 0.12.0 + */ +public class DefaultPublishObservationConvention implements PublishObservationConvention { + + private static final String OPERATION = "publish"; + private static final String OPERATION_SUFFIX = " " + OPERATION; + + @Override + public String getName() { + return "rabbitmq.stream.publish"; + } + + @Override + public String getContextualName(PublishContext context) { + return context.stream() + OPERATION_SUFFIX; + } + + @Override + public KeyValues getLowCardinalityKeyValues(PublishContext context) { + return KeyValues.of( + LowCardinalityTags.MESSAGING_OPERATION.withValue(OPERATION), + LowCardinalityTags.MESSAGING_SYSTEM.withValue("rabbitmq"), + LowCardinalityTags.NET_PROTOCOL_NAME.withValue("rabbitmq-stream"), + LowCardinalityTags.NET_PROTOCOL_VERSION.withValue("1.0")); + } + + @Override + public KeyValues getHighCardinalityKeyValues(PublishContext context) { + return KeyValues.of( + MESSAGING_DESTINATION_NAME.withValue(context.stream()), + MESSAGING_SOURCE_NAME.withValue(context.stream()), + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.withValue( + String.valueOf(context.getPayloadSizeBytes()))); + } +} diff --git a/src/main/java/com/rabbitmq/stream/observation/micrometer/MicrometerObservationCollector.java b/src/main/java/com/rabbitmq/stream/observation/micrometer/MicrometerObservationCollector.java new file mode 100644 index 0000000000..5c2666dbac --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/observation/micrometer/MicrometerObservationCollector.java @@ -0,0 +1,103 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.observation.micrometer; + +import com.rabbitmq.stream.*; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Micrometer's {@link ObservationCollector}. + * + * @since 0.12.0 + */ +class MicrometerObservationCollector implements ObservationCollector { + + private static final Logger LOGGER = + LoggerFactory.getLogger(MicrometerObservationCollector.class); + + private final ObservationRegistry registry; + private final PublishObservationConvention customPublishConvention, defaultPublishConvention; + private final ProcessObservationConvention customProcessConvention, defaultProcessConvention; + + MicrometerObservationCollector( + ObservationRegistry registry, + PublishObservationConvention customPublishConvention, + PublishObservationConvention defaultPublishConvention, + ProcessObservationConvention customProcessConvention, + ProcessObservationConvention defaultProcessConvention) { + this.registry = registry; + this.customPublishConvention = customPublishConvention; + this.defaultPublishConvention = defaultPublishConvention; + this.customProcessConvention = customProcessConvention; + this.defaultProcessConvention = defaultProcessConvention; + } + + @Override + public void published(Observation observation, Message message) { + try { + observation.stop(); + } catch (Exception e) { + LOGGER.warn("Error while stopping Micrometer observation: {}", e.getMessage()); + } + } + + @Override + public Observation prePublish(String stream, Message message) { + PublishContext context = new PublishContext(stream, message); + Observation observation = + StreamObservationDocumentation.PUBLISH_OBSERVATION.observation( + customPublishConvention, defaultPublishConvention, () -> context, registry); + observation.start(); + return observation; + } + + @Override + public MessageHandler subscribe(MessageHandler handler) { + return new ObservationMessageHandler( + handler, registry, customProcessConvention, defaultProcessConvention); + } + + private static class ObservationMessageHandler implements MessageHandler { + + private final MessageHandler delegate; + private final ObservationRegistry registry; + private final ProcessObservationConvention customProcessConvention, defaultProcessConvention; + + private ObservationMessageHandler( + MessageHandler delegate, + ObservationRegistry registry, + ProcessObservationConvention customProcessConvention, + ProcessObservationConvention defaultProcessConvention) { + this.delegate = delegate; + this.registry = registry; + this.customProcessConvention = customProcessConvention; + this.defaultProcessConvention = defaultProcessConvention; + } + + @Override + public void handle(Context context, Message message) { + ProcessContext processContext = new ProcessContext(context.stream(), message); + Observation observation = + StreamObservationDocumentation.PROCESS_OBSERVATION.observation( + this.customProcessConvention, + this.defaultProcessConvention, + () -> processContext, + this.registry); + observation.observeChecked(() -> delegate.handle(context, message)); + } + } +} diff --git a/src/main/java/com/rabbitmq/stream/observation/micrometer/MicrometerObservationCollectorBuilder.java b/src/main/java/com/rabbitmq/stream/observation/micrometer/MicrometerObservationCollectorBuilder.java new file mode 100644 index 0000000000..714fac1921 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/observation/micrometer/MicrometerObservationCollectorBuilder.java @@ -0,0 +1,139 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.observation.micrometer; + +import com.rabbitmq.stream.ObservationCollector; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.ObservationRegistry; +import java.util.function.Supplier; + +/** + * Builder to configure and create Micrometer + * Observation implementation of {@link ObservationCollector}. + * + * @since 0.12.0 + */ +public class MicrometerObservationCollectorBuilder { + + private ObservationRegistry registry = ObservationRegistry.NOOP; + private PublishObservationConvention customPublishObservationConvention; + private PublishObservationConvention defaultPublishObservationConvention = + new DefaultPublishObservationConvention(); + private ProcessObservationConvention customProcessObservationConvention; + private ProcessObservationConvention defaultProcessObservationConvention = + new DefaultProcessObservationConvention(); + + /** + * Set the {@link ObservationRegistry} to use. + * + *

Default is {@link ObservationRegistry#NOOP}. + * + * @param registry the registry + * @return this builder instance + */ + public MicrometerObservationCollectorBuilder registry(ObservationRegistry registry) { + this.registry = registry; + return this; + } + + /** + * Custom convention for publishing. + * + *

If not null, it will override any pre-configured conventions. + * + *

Default is null. + * + * @param customPublishObservationConvention the convention + * @return this builder instance + * @see io.micrometer.observation.docs.ObservationDocumentation#observation(ObservationConvention, + * ObservationConvention, Supplier, ObservationRegistry) + */ + public MicrometerObservationCollectorBuilder customPublishObservationConvention( + PublishObservationConvention customPublishObservationConvention) { + this.customPublishObservationConvention = customPublishObservationConvention; + return this; + } + + /** + * Default convention for publishing. + * + *

It will be picked if there was neither custom convention nor a pre-configured one via {@link + * ObservationRegistry}. + * + *

Default is {@link DefaultPublishObservationConvention}. + * + * @param defaultPublishObservationConvention the convention + * @return this builder instance + * @see io.micrometer.observation.docs.ObservationDocumentation#observation(ObservationConvention, + * ObservationConvention, Supplier, ObservationRegistry) + */ + public MicrometerObservationCollectorBuilder defaultPublishObservationConvention( + PublishObservationConvention defaultPublishObservationConvention) { + this.defaultPublishObservationConvention = defaultPublishObservationConvention; + return this; + } + + /** + * Custom convention for consuming. + * + *

If not null, it will override any pre-configured conventions. + * + *

Default is null. + * + * @param customProcessObservationConvention the convention + * @return this builder instance + * @see io.micrometer.observation.docs.ObservationDocumentation#observation(ObservationConvention, + * ObservationConvention, Supplier, ObservationRegistry) + */ + public MicrometerObservationCollectorBuilder customProcessObservationConvention( + ProcessObservationConvention customProcessObservationConvention) { + this.customProcessObservationConvention = customProcessObservationConvention; + return this; + } + + /** + * Default convention for consuming. + * + *

It will be picked if there was neither custom convention nor a pre-configured one via {@link + * ObservationRegistry}. + * + *

Default is {@link DefaultProcessObservationConvention}. + * + * @param defaultProcessObservationConvention the convention + * @return this builder instance + * @see io.micrometer.observation.docs.ObservationDocumentation#observation(ObservationConvention, + * ObservationConvention, Supplier, ObservationRegistry) + * @since 0.12.0 + */ + public MicrometerObservationCollectorBuilder defaultProcessObservationConvention( + ProcessObservationConvention defaultProcessObservationConvention) { + this.defaultProcessObservationConvention = defaultProcessObservationConvention; + return this; + } + + /** + * Create the Micrometer {@link ObservationCollector}. + * + * @return the Micrometer observation collector + */ + public ObservationCollector build() { + return new MicrometerObservationCollector( + this.registry, + this.customPublishObservationConvention, + this.defaultPublishObservationConvention, + this.customProcessObservationConvention, + this.defaultProcessObservationConvention); + } +} diff --git a/src/main/java/com/rabbitmq/stream/observation/micrometer/ProcessContext.java b/src/main/java/com/rabbitmq/stream/observation/micrometer/ProcessContext.java new file mode 100644 index 0000000000..c594fd051e --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/observation/micrometer/ProcessContext.java @@ -0,0 +1,63 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.observation.micrometer; + +import com.rabbitmq.stream.Message; +import io.micrometer.observation.transport.ReceiverContext; +import java.util.Map; + +/** + * {@link io.micrometer.observation.Observation.Context} for RabbitMQ Stream consuming. + * + * @since 0.12.0 + */ +public class ProcessContext extends ReceiverContext { + + private final String stream; + private final int payloadSizeBytes; + + ProcessContext(String stream, Message message) { + super( + (carrier, key) -> { + Map map = carrier.getMessageAnnotations(); + Object result = map == null ? null : map.get(key); + if (result == null) { + map = carrier.getApplicationProperties(); + result = map == null ? null : map.get(key); + } + if (result == null) { + return null; + } + return String.valueOf(result); + }); + this.stream = stream; + int payloadSize; + try { + byte[] body = message.getBodyAsBinary(); + payloadSize = body == null ? 0 : body.length; + } catch (Exception e) { + payloadSize = 0; + } + this.payloadSizeBytes = payloadSize; + setCarrier(message); + } + + public String getStream() { + return stream; + } + + public int getPayloadSizeBytes() { + return payloadSizeBytes; + } +} diff --git a/src/main/java/com/rabbitmq/stream/observation/micrometer/ProcessObservationConvention.java b/src/main/java/com/rabbitmq/stream/observation/micrometer/ProcessObservationConvention.java new file mode 100644 index 0000000000..26bbccf4e1 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/observation/micrometer/ProcessObservationConvention.java @@ -0,0 +1,30 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.observation.micrometer; + +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationConvention; + +/** + * {@link ObservationConvention} for RabbitMQ Stream consuming. + * + * @since 0.12.0 + */ +public interface ProcessObservationConvention extends ObservationConvention { + + @Override + default boolean supportsContext(Observation.Context context) { + return context instanceof ProcessContext; + } +} diff --git a/src/main/java/com/rabbitmq/stream/observation/micrometer/PublishContext.java b/src/main/java/com/rabbitmq/stream/observation/micrometer/PublishContext.java new file mode 100644 index 0000000000..702b8a6700 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/observation/micrometer/PublishContext.java @@ -0,0 +1,50 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.observation.micrometer; + +import com.rabbitmq.stream.Message; +import io.micrometer.observation.transport.SenderContext; + +/** + * {@link io.micrometer.observation.Observation.Context} for RabbitMQ Stream publishing. + * + * @since 0.12.0 + */ +public class PublishContext extends SenderContext { + + private final String stream; + private final int payloadSizeBytes; + + PublishContext(String stream, Message message) { + super((carrier, key, value) -> carrier.annotate(key, value)); + this.stream = stream; + int payloadSize; + try { + byte[] body = message.getBodyAsBinary(); + payloadSize = body == null ? 0 : body.length; + } catch (Exception e) { + payloadSize = 0; + } + this.payloadSizeBytes = payloadSize; + setCarrier(message); + } + + public String stream() { + return this.stream; + } + + public int getPayloadSizeBytes() { + return this.payloadSizeBytes; + } +} diff --git a/src/main/java/com/rabbitmq/stream/observation/micrometer/PublishObservationConvention.java b/src/main/java/com/rabbitmq/stream/observation/micrometer/PublishObservationConvention.java new file mode 100644 index 0000000000..8143c5df85 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/observation/micrometer/PublishObservationConvention.java @@ -0,0 +1,30 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.observation.micrometer; + +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationConvention; + +/** + * {@link ObservationConvention} for RabbitMQ Stream consuming. + * + * @since 0.12.0 + */ +public interface PublishObservationConvention extends ObservationConvention { + + @Override + default boolean supportsContext(Observation.Context context) { + return context instanceof PublishContext; + } +} diff --git a/src/main/java/com/rabbitmq/stream/observation/micrometer/StreamObservationDocumentation.java b/src/main/java/com/rabbitmq/stream/observation/micrometer/StreamObservationDocumentation.java new file mode 100644 index 0000000000..291d5f923e --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/observation/micrometer/StreamObservationDocumentation.java @@ -0,0 +1,141 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.observation.micrometer; + +import io.micrometer.common.docs.KeyName; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.docs.ObservationDocumentation; + +/** + * {@link ObservationDocumentation} for RabbitMQ Stream. + * + * @since 0.12.0 + */ +public enum StreamObservationDocumentation implements ObservationDocumentation { + + /** Observation for publishing a message. */ + PUBLISH_OBSERVATION { + + @Override + public Class> + getDefaultConvention() { + return DefaultPublishObservationConvention.class; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return LowCardinalityTags.values(); + } + }, + + /** Observation for processing a message. */ + PROCESS_OBSERVATION { + + @Override + public Class> + getDefaultConvention() { + return DefaultProcessObservationConvention.class; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return LowCardinalityTags.values(); + } + }; + + /** Low cardinality tags. */ + public enum LowCardinalityTags implements KeyName { + + /** A string identifying the messaging system. */ + MESSAGING_SYSTEM { + + @Override + public String asString() { + return "messaging.system"; + } + }, + + /** A string identifying the kind of messaging operation. */ + MESSAGING_OPERATION { + + @Override + public String asString() { + return "messaging.operation"; + } + }, + + /** A string identifying the protocol (RabbitMQ Stream). */ + NET_PROTOCOL_NAME { + + @Override + public String asString() { + return "net.protocol.name"; + } + }, + + /** A string identifying the protocol version (1.0). */ + NET_PROTOCOL_VERSION { + + @Override + public String asString() { + return "net.protocol.version"; + } + }, + } + + /** High cardinality tags. */ + public enum HighCardinalityTags implements KeyName { + + /** The message destination name. */ + MESSAGING_DESTINATION_NAME { + + @Override + public String asString() { + return "messaging.destination.name"; + } + }, + + /** The message destination name. */ + MESSAGING_SOURCE_NAME { + + @Override + public String asString() { + return "messaging.source.name"; + } + }, + + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES { + + @Override + public String asString() { + return "messaging.message.payload_size_bytes"; + } + }, + + NET_SOCK_PEER_PORT { + @Override + public String asString() { + return "net.sock.peer.port"; + } + }, + + NET_SOCK_PEER_ADDR { + @Override + public String asString() { + return "net.sock.peer.addr"; + } + } + } +} diff --git a/src/test/java/com/rabbitmq/stream/codec/CodecsTest.java b/src/test/java/com/rabbitmq/stream/codec/CodecsTest.java index ec5be5f2ac..0509c764f6 100644 --- a/src/test/java/com/rabbitmq/stream/codec/CodecsTest.java +++ b/src/test/java/com/rabbitmq/stream/codec/CodecsTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -13,6 +13,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.codec; +import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; @@ -32,13 +33,13 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.stream.Stream; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.assertj.core.api.InstanceOfAssertFactories; @@ -53,26 +54,35 @@ public class CodecsTest { static UUID TEST_UUID = UUID.randomUUID(); static Iterable codecsCouples() { - List codecs = Arrays.asList(new QpidProtonCodec(), new SwiftMqCodec()); + List codecs = asList(new QpidProtonCodec(), new SwiftMqCodec()); List couples = new ArrayList<>(); for (Codec serializer : codecs) { for (Codec deserializer : codecs) { - couples.add(new CodecCouple(serializer, deserializer, () -> serializer.messageBuilder())); - couples.add(new CodecCouple(serializer, deserializer, () -> new WrapperMessageBuilder())); + couples.add(new CodecCouple(serializer, deserializer, serializer::messageBuilder)); + couples.add(new CodecCouple(serializer, deserializer, WrapperMessageBuilder::new)); + } + } + return couples; + } + + static Iterable codecsCombinations() { + List codecs = asList(new QpidProtonCodec(), new SwiftMqCodec()); + List couples = new ArrayList<>(); + for (Codec serializer : codecs) { + for (Codec deserializer : codecs) { + couples.add(new CodecCouple(serializer, deserializer, serializer::messageBuilder)); } } return couples; } static Iterable> messageBuilderSuppliers() { - return Arrays.asList( - new MessageBuilderCreator(QpidProtonMessageBuilder.class), - new MessageBuilderCreator(SwiftMqMessageBuilder.class), - new MessageBuilderCreator(WrapperMessageBuilder.class)); + return asList( + QpidProtonMessageBuilder::new, SwiftMqMessageBuilder::new, WrapperMessageBuilder::new); } static Iterable readCreatedMessage() { - return Arrays.asList( + return asList( when(mock(Codec.class).messageBuilder()).thenReturn(new WrapperMessageBuilder()).getMock(), new QpidProtonCodec(), new SwiftMqCodec()); @@ -239,6 +249,8 @@ void codecs(CodecCouple codecCouple) { .entry("annotations.null", (String) null) .messageBuilder() .build(); + outboundMessage.annotate("extra.annotation", "extra annotation value"); + Codec.EncodedMessage encoded = serializer.encode(outboundMessage); byte[] encodedData = new byte[encoded.getSize()]; @@ -457,6 +469,10 @@ void codecs(CodecCouple codecCouple) { .isInstanceOf(String.class) .isEqualTo(symbol); assertThat(inboundMessage.getMessageAnnotations().get("annotations.null")).isNull(); + assertThat(inboundMessage.getMessageAnnotations().get("extra.annotation")) + .isNotNull() + .isInstanceOf(String.class) + .isEqualTo("extra annotation value"); }); } @@ -529,8 +545,7 @@ void supportAmqpValueBody(Codec codec) { EncodedMessage encoded = new QpidProtonCodec().encode(wrapper); byte[] encodedData = new byte[encoded.getSize()]; System.arraycopy(encoded.getData(), 0, encodedData, 0, encoded.getSize()); - Message decodedMessage = codec.decode(encodedData); - return decodedMessage; + return codec.decode(encodedData); }; Message m1 = encodeDecode.apply("hello".getBytes(StandardCharsets.UTF_8)); @@ -556,6 +571,58 @@ void publishingIdShouldNotBeSetOnMessageIfNotSetOnMessageBuilder(MessageBuilder assertThat(message.getPublishingId()).isEqualTo(0); } + @ParameterizedTest + @MethodSource("codecsCombinations") + void copy(CodecCouple codecCouple) { + Codec serializer = codecCouple.serializer; + Codec deserializer = codecCouple.deserializer; + byte[] body = "hello".getBytes(StandardCharsets.UTF_8); + + Message message = + serializer + .messageBuilder() + .addData(body) + .messageAnnotations() + .entry("foo", "bar") + .messageBuilder() + .build(); + Message copy = message.copy(); + + message.annotate("original", "original value"); + copy.annotate("copy", "copy value"); + + assertThat(message.getMessageAnnotations()) + .hasSize(2) + .containsEntry("foo", "bar") + .containsEntry("original", "original value"); + + assertThat(copy.getMessageAnnotations()) + .hasSize(2) + .containsEntry("foo", "bar") + .containsEntry("copy", "copy value"); + + UnaryOperator encodeDecode = + msg -> { + EncodedMessage encoded = serializer.encode(msg); + byte[] encodedData = new byte[encoded.getSize()]; + System.arraycopy(encoded.getData(), 0, encodedData, 0, encoded.getSize()); + return deserializer.decode(encodedData); + }; + + message = encodeDecode.apply(message); + + assertThat(message.getMessageAnnotations()) + .hasSize(2) + .containsEntry("foo", "bar") + .containsEntry("original", "original value"); + + copy = encodeDecode.apply(copy); + assertThat(copy.getMessageAnnotations()) + .hasSize(2) + .containsEntry("foo", "bar") + .containsEntry("copy", "copy value"); + } + MessageTestConfiguration test( Function messageOperation, Consumer messageExpectation) { @@ -598,29 +665,4 @@ public String toString() { + messageBuilderSupplier.get().getClass().getSimpleName(); } } - - static class MessageBuilderCreator implements Supplier { - - final Supplier supplier; - - MessageBuilderCreator(Class clazz) { - supplier = - () -> { - try { - return clazz.getDeclaredConstructor().newInstance(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; - } - - public MessageBuilder get() { - return supplier.get(); - } - - @Override - public String toString() { - return get().getClass().getSimpleName(); - } - } } diff --git a/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java b/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java index 0b0ba4d3c5..6a8dd0d8de 100644 --- a/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java +++ b/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java @@ -14,12 +14,10 @@ package com.rabbitmq.stream.docs; -import com.rabbitmq.stream.Address; -import com.rabbitmq.stream.ByteCapacity; -import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.*; -import com.rabbitmq.stream.EnvironmentBuilder; -import io.netty.channel.Channel; +import com.rabbitmq.stream.observation.micrometer.MicrometerObservationCollectorBuilder; +import io.micrometer.observation.ObservationRegistry; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; @@ -30,8 +28,6 @@ import java.security.cert.X509Certificate; import java.time.Duration; import java.util.Arrays; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; public class EnvironmentUsage { @@ -153,4 +149,14 @@ void nativeEpoll() { // end::native-epoll[] } + void micrometerObservation() { + ObservationRegistry observationRegistry = ObservationRegistry.NOOP; + // tag::micrometer-observation[] + Environment environment = Environment.builder() + .observationCollector(new MicrometerObservationCollectorBuilder() // <1> + .registry(observationRegistry).build()) // <2> + .build(); + // end::micrometer-observation[] + } + } diff --git a/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java b/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java index 8b39dd3a64..37492c83d5 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java index b55996a9f7..e20f9c520b 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -20,6 +20,7 @@ import static org.mockito.Mockito.*; import com.rabbitmq.stream.BackOffDelayPolicy; +import com.rabbitmq.stream.ObservationCollector; import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.StreamEnvironment.LocatorNotAvailableException; @@ -94,7 +95,8 @@ Client.ClientParameters duplicate() { ByteBufAllocator.DEFAULT, false, type -> "locator-connection", - cf); + cf, + ObservationCollector.NO_OP); } @AfterEach @@ -157,7 +159,8 @@ void shouldTryUrisOnInitializationFailure() throws Exception { ByteBufAllocator.DEFAULT, false, type -> "locator-connection", - cf); + cf, + ObservationCollector.NO_OP); verify(cf, times(3)).apply(any(Client.ClientParameters.class)); } @@ -183,7 +186,8 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled( ByteBufAllocator.DEFAULT, lazyInit, type -> "locator-connection", - cf); + cf, + ObservationCollector.NO_OP); verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class)); } diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 7677d1984e..30c3f8ffb6 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -22,6 +22,7 @@ import com.rabbitmq.stream.ConfirmationHandler; import com.rabbitmq.stream.Constants; +import com.rabbitmq.stream.ObservationCollector; import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.codec.SimpleCodec; import com.rabbitmq.stream.compression.Compression; @@ -112,6 +113,7 @@ void init() { when(env.locatorOperation(any())).thenCallRealMethod(); when(env.clock()).thenReturn(clock); when(env.codec()).thenReturn(new SimpleCodec()); + when(env.observationCollector()).thenAnswer(invocation -> ObservationCollector.NO_OP); doAnswer( (Answer) invocationOnMock -> { diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index b9d74c9f48..8bfc112bbd 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -154,7 +154,7 @@ public static Duration waitAtMost( return Duration.ofMillis(waitedTime); } - static Address localhost() { + public static Address localhost() { return new Address("localhost", Client.DEFAULT_PORT); } @@ -555,9 +555,9 @@ interface RunnableWithException { void run() throws Exception; } - static class CountDownLatchConditions { + public static class CountDownLatchConditions { - static Condition completed() { + public static Condition completed() { return completed(Duration.ofSeconds(10)); } @@ -601,19 +601,17 @@ public void beforeAll(ExtensionContext context) { @Override public void beforeEach(ExtensionContext context) throws Exception { - try { - Field streamField = - context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup"); - streamField.setAccessible(true); - streamField.set(context.getTestInstance().get(), eventLoopGroup(context)); - } catch (NoSuchFieldException e) { - + Field field = field(context.getTestInstance().get().getClass(), "eventLoopGroup"); + if (field != null) { + field.setAccessible(true); + field.set(context.getTestInstance().get(), eventLoopGroup(context)); } - try { - Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); - streamField.setAccessible(true); + + field = field(context.getTestInstance().get().getClass(), "stream"); + if (field != null) { + field.setAccessible(true); String stream = streamName(context); - streamField.set(context.getTestInstance().get(), stream); + field.set(context.getTestInstance().get(), stream); Client client = new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup(context))); Client.Response response = client.create(stream); @@ -621,8 +619,6 @@ public void beforeEach(ExtensionContext context) throws Exception { store(context.getRoot()).put("filteringSupported", client.filteringSupported()); client.close(); store(context).put("testMethodStream", stream); - } catch (NoSuchFieldException e) { - } for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) { @@ -680,6 +676,18 @@ public void afterAll(ExtensionContext context) { }); } + private static Field field(Class cls, String name) { + Field field = null; + while (field == null && cls != null) { + try { + field = cls.getDeclaredField(name); + } catch (NoSuchFieldException e) { + cls = cls.getSuperclass(); + } + } + return field; + } + private static class ExecutorServiceCloseableResourceWrapper implements CloseableResource { private final ExecutorService executorService; diff --git a/src/test/java/com/rabbitmq/stream/observation/micrometer/MicrometerObservationCollectorTest.java b/src/test/java/com/rabbitmq/stream/observation/micrometer/MicrometerObservationCollectorTest.java new file mode 100644 index 0000000000..ca83e40859 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/observation/micrometer/MicrometerObservationCollectorTest.java @@ -0,0 +1,179 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.observation.micrometer; + +import static com.rabbitmq.stream.OffsetSpecification.first; +import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed; +import static com.rabbitmq.stream.impl.TestUtils.localhost; +import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static io.micrometer.tracing.test.simple.SpanAssert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.stream.*; +import com.rabbitmq.stream.codec.QpidProtonCodec; +import com.rabbitmq.stream.codec.SwiftMqCodec; +import com.rabbitmq.stream.impl.TestUtils; +import io.micrometer.tracing.test.SampleTestRunner; +import io.micrometer.tracing.test.reporter.BuildingBlocks; +import io.micrometer.tracing.test.simple.SpansAssert; +import io.netty.channel.EventLoopGroup; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.extension.ExtendWith; + +public class MicrometerObservationCollectorTest { + + private static final byte[] PAYLOAD = "msg".getBytes(StandardCharsets.UTF_8); + + @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) + private abstract static class IntegrationTest extends SampleTestRunner { + + String stream; + EventLoopGroup eventLoopGroup; + + EnvironmentBuilder environmentBuilder() { + return Environment.builder() + .netty() + .eventLoopGroup(eventLoopGroup) + .environmentBuilder() + .addressResolver(add -> localhost()); + } + + ObservationCollector observationCollector() { + return new MicrometerObservationCollectorBuilder().registry(getObservationRegistry()).build(); + } + + @Override + public TracingSetup[] getTracingSetup() { + return new TracingSetup[] {TracingSetup.IN_MEMORY_BRAVE, TracingSetup.ZIPKIN_BRAVE}; + } + + void publishConsume(Codec codec, BuildingBlocks buildingBlocks) throws Exception { + try (Environment env = + environmentBuilder().codec(codec).observationCollector(observationCollector()).build()) { + Producer producer = env.producerBuilder().stream(stream).build(); + CountDownLatch publishLatch = new CountDownLatch(1); + producer.send( + producer.messageBuilder().addData(PAYLOAD).build(), status -> publishLatch.countDown()); + + assertThat(publishLatch).is(completed()); + + CountDownLatch consumeLatch = new CountDownLatch(1); + env.consumerBuilder().stream(stream) + .offset(first()) + .messageHandler((ctx, msg) -> consumeLatch.countDown()) + .build(); + + assertThat(consumeLatch).is(completed()); + + waitAtMost(() -> buildingBlocks.getFinishedSpans().size() == 2); + + SpansAssert.assertThat(buildingBlocks.getFinishedSpans()).haveSameTraceId().hasSize(2); + assertThat(buildingBlocks.getFinishedSpans().get(0)) + .hasNameEqualTo(stream + " publish") + .hasTag("messaging.destination.name", stream) + .hasTag("messaging.message.payload_size_bytes", String.valueOf(PAYLOAD.length)) + .hasTag("net.protocol.name", "rabbitmq-stream") + .hasTag("net.protocol.version", "1.0"); + assertThat(buildingBlocks.getFinishedSpans().get(1)) + .hasNameEqualTo(stream + " process") + .hasTag("messaging.destination.name", stream) + .hasTag("messaging.source.name", stream) + .hasTag("messaging.message.payload_size_bytes", String.valueOf(PAYLOAD.length)) + .hasTag("net.protocol.name", "rabbitmq-stream") + .hasTag("net.protocol.version", "1.0"); + waitAtMost( + () -> + getMeterRegistry().find("rabbitmq.stream.publish").timer() != null + && getMeterRegistry().find("rabbitmq.stream.process").timer() != null); + getMeterRegistry() + .get("rabbitmq.stream.publish") + .tag("messaging.operation", "publish") + .tag("messaging.system", "rabbitmq") + .timer(); + getMeterRegistry() + .get("rabbitmq.stream.process") + .tag("messaging.operation", "process") + .tag("messaging.system", "rabbitmq") + .timer(); + } + } + } + + @Nested + class PublishConsumeQpidCodec extends IntegrationTest { + + @Override + public SampleTestRunnerConsumer yourCode() { + return (buildingBlocks, meterRegistry) -> + publishConsume(new QpidProtonCodec(), buildingBlocks); + } + } + + @Nested + class PublishConsumeSwiftMqCodec extends IntegrationTest { + + @Override + public SampleTestRunnerConsumer yourCode() { + return (buildingBlocks, meterRegistry) -> publishConsume(new SwiftMqCodec(), buildingBlocks); + } + } + + @Nested + class ConsumeWithoutObservationShouldNotFail extends IntegrationTest { + + @Override + public SampleTestRunnerConsumer yourCode() { + return (buildingBlocks, meterRegistry) -> { + try (Environment publishEnv = environmentBuilder().build(); + Environment consumeEnv = + environmentBuilder().observationCollector(observationCollector()).build()) { + Producer producer = publishEnv.producerBuilder().stream(stream).build(); + CountDownLatch publishLatch = new CountDownLatch(1); + producer.send( + producer.messageBuilder().addData(PAYLOAD).build(), + status -> publishLatch.countDown()); + + assertThat(publishLatch).is(completed()); + + CountDownLatch consumeLatch = new CountDownLatch(1); + consumeEnv.consumerBuilder().stream(stream) + .offset(first()) + .messageHandler((ctx, msg) -> consumeLatch.countDown()) + .build(); + + assertThat(consumeLatch).is(completed()); + + waitAtMost(() -> buildingBlocks.getFinishedSpans().size() == 1); + + SpansAssert.assertThat(buildingBlocks.getFinishedSpans()).haveSameTraceId().hasSize(1); + assertThat(buildingBlocks.getFinishedSpans().get(0)) + .hasNameEqualTo(stream + " process") + .hasTag("messaging.destination.name", stream) + .hasTag("messaging.source.name", stream) + .hasTag("messaging.message.payload_size_bytes", String.valueOf(PAYLOAD.length)) + .hasTag("net.protocol.name", "rabbitmq-stream") + .hasTag("net.protocol.version", "1.0"); + waitAtMost(() -> getMeterRegistry().find("rabbitmq.stream.process").timer() != null); + getMeterRegistry() + .get("rabbitmq.stream.process") + .tag("messaging.operation", "process") + .tag("messaging.system", "rabbitmq") + .timer(); + } + }; + } + } +}