diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java index 2bbdb1d75..64bce6cc2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java @@ -38,7 +38,14 @@ public SettableApiFuture getMessageFutureIfExists() { return this.messageFuture.orElse(null); } + /** + * Returns an empty PubsubMessageWrapper with OpenTelemetry tracing disabled. This allows methods + * that use this method to be unit tested. + */ public PubsubMessageWrapper getMessageWrapper() { + if (this.messageWrapper == null) { + return PubsubMessageWrapper.newBuilder(null, null, false).build(); + } return messageWrapper; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index a766efb18..26d1f253d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -28,7 +28,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; -import com.google.pubsub.v1.SubscriptionName; import io.opentelemetry.api.trace.Tracer; import java.util.ArrayList; import java.util.HashMap; @@ -408,7 +407,7 @@ void processReceivedMessages(List messages) { PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder( message.getMessage(), - SubscriptionName.parse(subscriptionName), + subscriptionName, message.getAckId(), message.getDeliveryAttempt(), enableOpenTelemetryTracing) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java index 6ee6bbccb..b99db9c2a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryUtil.java @@ -57,10 +57,11 @@ public static final AttributesBuilder createCommonSpanAttributesBuilder( */ public static final Span startPublishRpcSpan( Tracer tracer, - TopicName topicName, + String topic, List messages, boolean enableOpenTelemetryTracing) { if (enableOpenTelemetryTracing && tracer != null) { + TopicName topicName = TopicName.parse(topic); Attributes attributes = createCommonSpanAttributesBuilder( topicName.getTopic(), topicName.getProject(), "Publisher.publishCall", "publish") @@ -114,7 +115,7 @@ public static final void setPublishRpcSpanException( */ public static final Span startSubscribeRpcSpan( Tracer tracer, - SubscriptionName subscriptionName, + String subscription, String rpcOperation, List messages, int ackDeadline, @@ -125,6 +126,7 @@ public static final Span startSubscribeRpcSpan( rpcOperation == "ack" ? "StreamingSubscriberConnection.sendAckOperations" : "StreamingSubscriberConnection.sendModAckOperations"; + SubscriptionName subscriptionName = SubscriptionName.parse(subscription); AttributesBuilder attributesBuilder = createCommonSpanAttributesBuilder( subscriptionName.getSubscription(), diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index d5547062e..fab0c4e5c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -488,7 +488,7 @@ private ApiFuture publishCall(OutstandingBatch outstandingBatch outstandingBatch.publishRpcSpan = OpenTelemetryUtil.startPublishRpcSpan( - tracer, TopicName.parse(topicName), messageWrappers, enableOpenTelemetryTracing); + tracer, topicName, messageWrappers, enableOpenTelemetryTracing); return publisherStub .publishCallable() diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java index 78e6625fe..cf270351c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java @@ -103,13 +103,13 @@ public PubsubMessageWrapper(Builder builder) { } public static Builder newBuilder( - PubsubMessage message, TopicName topicName, boolean enableOpenTelemetryTracing) { + PubsubMessage message, String topicName, boolean enableOpenTelemetryTracing) { return new Builder(message, topicName, enableOpenTelemetryTracing); } public static Builder newBuilder( PubsubMessage message, - SubscriptionName subscriptionName, + String subscriptionName, String ackId, int deliveryAttempt, boolean enableOpenTelemetryTracing) { @@ -502,9 +502,26 @@ protected static final class Builder { private int deliveryAttempt = 0; private boolean enableOpenTelemetryTracing = false; - public Builder(PubsubMessage message, TopicName topicName, boolean enableOpenTelemetryTracing) { + public Builder(PubsubMessage message, String topicName, boolean enableOpenTelemetryTracing) { this.message = message; - this.topicName = topicName; + if (topicName != null) { + this.topicName = TopicName.parse(topicName); + } + this.enableOpenTelemetryTracing = enableOpenTelemetryTracing; + } + + public Builder( + PubsubMessage message, + String subscriptionName, + String ackId, + int deliveryAttempt, + boolean enableOpenTelemetryTracing) { + this.message = message; + if (subscriptionName != null) { + this.subscriptionName = SubscriptionName.parse(subscriptionName); + } + this.ackId = ackId; + this.deliveryAttempt = deliveryAttempt; this.enableOpenTelemetryTracing = enableOpenTelemetryTracing; } @@ -522,7 +539,10 @@ public Builder( } public PubsubMessageWrapper build() { - Preconditions.checkArgument(this.topicName != null || this.subscriptionName != null); + Preconditions.checkArgument( + this.enableOpenTelemetryTracing == false + || this.topicName != null + || this.subscriptionName != null); return new PubsubMessageWrapper(this); } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 5824ba41b..17b1e7939 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -121,7 +121,6 @@ final class StreamingSubscriberConnection extends AbstractApiService implements */ private final String clientId = UUID.randomUUID().toString(); - private final String subscriptionName; private final boolean enableOpenTelemetryTracing; private final Tracer tracer; @@ -158,7 +157,6 @@ private StreamingSubscriberConnection(Builder builder) { messageDispatcherBuilder = MessageDispatcher.newBuilder(builder.receiverWithAckResponse); } - subscriptionName = builder.subscriptionName; enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing; tracer = builder.tracer; @@ -176,7 +174,7 @@ private StreamingSubscriberConnection(Builder builder) { .setExecutor(builder.executor) .setSystemExecutor(builder.systemExecutor) .setApiClock(builder.clock) - .setSubscriptionName(subscriptionName) + .setSubscriptionName(subscription) .setEnableOpenTelemetryTracing(enableOpenTelemetryTracing) .setTracer(tracer) .build(); @@ -458,13 +456,7 @@ private void sendAckOperations( // Creates an Ack span to be passed to the callback Span rpcSpan = OpenTelemetryUtil.startSubscribeRpcSpan( - tracer, - SubscriptionName.parse(subscriptionName), - "ack", - messagesInRequest, - 0, - false, - enableOpenTelemetryTracing); + tracer, subscription, "ack", messagesInRequest, 0, false, enableOpenTelemetryTracing); ApiFutureCallback callback = getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis, rpcSpan); ApiFuture ackFuture = @@ -504,7 +496,7 @@ private void sendModackOperations( Span rpcSpan = OpenTelemetryUtil.startSubscribeRpcSpan( tracer, - SubscriptionName.parse(subscriptionName), + subscription, rpcOperation, messagesInRequest, deadlineExtensionSeconds, @@ -711,7 +703,6 @@ public static final class Builder { private ScheduledExecutorService systemExecutor; private ApiClock clock; - private String subscriptionName; private boolean enableOpenTelemetryTracing; private Tracer tracer; @@ -805,11 +796,6 @@ public Builder setClock(ApiClock clock) { return this; } - public Builder setSubscriptionName(String subscriptionName) { - this.subscriptionName = subscriptionName; - return this; - } - public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) { this.enableOpenTelemetryTracing = enableOpenTelemetryTracing; return this; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 14b6731f1..e581875a7 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -400,7 +400,6 @@ private void startStreamingConnections() { .setExecutor(executor) .setSystemExecutor(alarmsExecutor) .setClock(clock) - .setSubscriptionName(subscriptionName) .setEnableOpenTelemetryTracing(enableOpenTelemetryTracing) .setTracer(tracer) .build(); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java index 9c414d6a9..8690d0f04 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java @@ -54,8 +54,8 @@ public class OpenTelemetryTest { private static final String PUBLISH_END_EVENT = "publish end"; private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; - private static final String PROJECT_ATTR_KEY = "gcp_pubsub.project_id"; - private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.envelope.size"; + private static final String PROJECT_ATTR_KEY = "gcp.project_id"; + private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size"; private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key"; private static final String TRACEPARENT_ATTRIBUTE = "googclient_traceparent"; @@ -70,7 +70,7 @@ public void testPublishSpansSuccess() { PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME, true).build(); List messageWrappers = Arrays.asList(messageWrapper); - long messageSize = messageWrapper.getPubsubMessage().getSerializedSize(); + long messageSize = messageWrapper.getPubsubMessage().getData().size(); Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); // Call all span start/end methods in the expected order @@ -80,7 +80,8 @@ public void testPublishSpansSuccess() { messageWrapper.startPublishBatchingSpan(tracer); messageWrapper.endPublishBatchingSpan(); Span publishRpcSpan = - OpenTelemetryUtil.startPublishRpcSpan(tracer, FULL_TOPIC_NAME, messageWrappers, true); + OpenTelemetryUtil.startPublishRpcSpan( + tracer, FULL_TOPIC_NAME.toString(), messageWrappers, true); OpenTelemetryUtil.endPublishRpcSpan(publishRpcSpan, true); messageWrapper.setPublisherMessageIdSpanAttribute(MESSAGE_ID); messageWrapper.endPublisherSpan(); @@ -265,7 +266,8 @@ public void testPublishRpcSpanFailure() { messageWrapper.startPublisherSpan(tracer); Span publishRpcSpan = - OpenTelemetryUtil.startPublishRpcSpan(tracer, FULL_TOPIC_NAME, messageWrappers, true); + OpenTelemetryUtil.startPublishRpcSpan( + tracer, FULL_TOPIC_NAME.toString(), messageWrappers, true); Exception e = new Exception("test-exception"); OpenTelemetryUtil.setPublishRpcSpanException(publishRpcSpan, e, true);