Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ public SettableApiFuture<AckResponse> getMessageFutureIfExists() {
return this.messageFuture.orElse(null);
}

/**
* Returns an empty PubsubMessageWrapper with OpenTelemetry tracing disabled. This allows methods
* that use this method to be unit tested.
*/
public PubsubMessageWrapper getMessageWrapper() {
if (this.messageWrapper == null) {
return PubsubMessageWrapper.newBuilder(null, null, false).build();
}
return messageWrapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.SubscriptionName;
import io.opentelemetry.api.trace.Tracer;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -408,7 +407,7 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
message.getMessage(),
SubscriptionName.parse(subscriptionName),
subscriptionName,
message.getAckId(),
message.getDeliveryAttempt(),
enableOpenTelemetryTracing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ public static final AttributesBuilder createCommonSpanAttributesBuilder(
*/
public static final Span startPublishRpcSpan(
Tracer tracer,
TopicName topicName,
String topic,
List<PubsubMessageWrapper> messages,
boolean enableOpenTelemetryTracing) {
if (enableOpenTelemetryTracing && tracer != null) {
TopicName topicName = TopicName.parse(topic);
Attributes attributes =
createCommonSpanAttributesBuilder(
topicName.getTopic(), topicName.getProject(), "Publisher.publishCall", "publish")
Expand Down Expand Up @@ -114,7 +115,7 @@ public static final void setPublishRpcSpanException(
*/
public static final Span startSubscribeRpcSpan(
Tracer tracer,
SubscriptionName subscriptionName,
String subscription,
String rpcOperation,
List<PubsubMessageWrapper> messages,
int ackDeadline,
Expand All @@ -125,6 +126,7 @@ public static final Span startSubscribeRpcSpan(
rpcOperation == "ack"
? "StreamingSubscriberConnection.sendAckOperations"
: "StreamingSubscriberConnection.sendModAckOperations";
SubscriptionName subscriptionName = SubscriptionName.parse(subscription);
AttributesBuilder attributesBuilder =
createCommonSpanAttributesBuilder(
subscriptionName.getSubscription(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch

outstandingBatch.publishRpcSpan =
OpenTelemetryUtil.startPublishRpcSpan(
tracer, TopicName.parse(topicName), messageWrappers, enableOpenTelemetryTracing);
tracer, topicName, messageWrappers, enableOpenTelemetryTracing);

return publisherStub
.publishCallable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ public PubsubMessageWrapper(Builder builder) {
}

public static Builder newBuilder(
PubsubMessage message, TopicName topicName, boolean enableOpenTelemetryTracing) {
PubsubMessage message, String topicName, boolean enableOpenTelemetryTracing) {
return new Builder(message, topicName, enableOpenTelemetryTracing);
}

public static Builder newBuilder(
PubsubMessage message,
SubscriptionName subscriptionName,
String subscriptionName,
String ackId,
int deliveryAttempt,
boolean enableOpenTelemetryTracing) {
Expand Down Expand Up @@ -502,9 +502,26 @@ protected static final class Builder {
private int deliveryAttempt = 0;
private boolean enableOpenTelemetryTracing = false;

public Builder(PubsubMessage message, TopicName topicName, boolean enableOpenTelemetryTracing) {
public Builder(PubsubMessage message, String topicName, boolean enableOpenTelemetryTracing) {
this.message = message;
this.topicName = topicName;
if (topicName != null) {
this.topicName = TopicName.parse(topicName);
}
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
}

public Builder(
PubsubMessage message,
String subscriptionName,
String ackId,
int deliveryAttempt,
boolean enableOpenTelemetryTracing) {
this.message = message;
if (subscriptionName != null) {
this.subscriptionName = SubscriptionName.parse(subscriptionName);
}
this.ackId = ackId;
this.deliveryAttempt = deliveryAttempt;
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
}

Expand All @@ -522,7 +539,10 @@ public Builder(
}

public PubsubMessageWrapper build() {
Preconditions.checkArgument(this.topicName != null || this.subscriptionName != null);
Preconditions.checkArgument(
this.enableOpenTelemetryTracing == false
|| this.topicName != null
|| this.subscriptionName != null);
return new PubsubMessageWrapper(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
*/
private final String clientId = UUID.randomUUID().toString();

private final String subscriptionName;
private final boolean enableOpenTelemetryTracing;
private final Tracer tracer;

Expand Down Expand Up @@ -158,7 +157,6 @@ private StreamingSubscriberConnection(Builder builder) {
messageDispatcherBuilder = MessageDispatcher.newBuilder(builder.receiverWithAckResponse);
}

subscriptionName = builder.subscriptionName;
enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
tracer = builder.tracer;

Expand All @@ -176,7 +174,7 @@ private StreamingSubscriberConnection(Builder builder) {
.setExecutor(builder.executor)
.setSystemExecutor(builder.systemExecutor)
.setApiClock(builder.clock)
.setSubscriptionName(subscriptionName)
.setSubscriptionName(subscription)
.setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
.setTracer(tracer)
.build();
Expand Down Expand Up @@ -458,13 +456,7 @@ private void sendAckOperations(
// Creates an Ack span to be passed to the callback
Span rpcSpan =
OpenTelemetryUtil.startSubscribeRpcSpan(
tracer,
SubscriptionName.parse(subscriptionName),
"ack",
messagesInRequest,
0,
false,
enableOpenTelemetryTracing);
tracer, subscription, "ack", messagesInRequest, 0, false, enableOpenTelemetryTracing);
ApiFutureCallback<Empty> callback =
getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis, rpcSpan);
ApiFuture<Empty> ackFuture =
Expand Down Expand Up @@ -504,7 +496,7 @@ private void sendModackOperations(
Span rpcSpan =
OpenTelemetryUtil.startSubscribeRpcSpan(
tracer,
SubscriptionName.parse(subscriptionName),
subscription,
rpcOperation,
messagesInRequest,
deadlineExtensionSeconds,
Expand Down Expand Up @@ -711,7 +703,6 @@ public static final class Builder {
private ScheduledExecutorService systemExecutor;
private ApiClock clock;

private String subscriptionName;
private boolean enableOpenTelemetryTracing;
private Tracer tracer;

Expand Down Expand Up @@ -805,11 +796,6 @@ public Builder setClock(ApiClock clock) {
return this;
}

public Builder setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
return this;
}

public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ private void startStreamingConnections() {
.setExecutor(executor)
.setSystemExecutor(alarmsExecutor)
.setClock(clock)
.setSubscriptionName(subscriptionName)
.setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
.setTracer(tracer)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public class OpenTelemetryTest {
private static final String PUBLISH_END_EVENT = "publish end";

private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub";
private static final String PROJECT_ATTR_KEY = "gcp_pubsub.project_id";
private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.envelope.size";
private static final String PROJECT_ATTR_KEY = "gcp.project_id";
private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size";
private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";

private static final String TRACEPARENT_ATTRIBUTE = "googclient_traceparent";
Expand All @@ -70,7 +70,7 @@ public void testPublishSpansSuccess() {
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME, true).build();
List<PubsubMessageWrapper> messageWrappers = Arrays.asList(messageWrapper);

long messageSize = messageWrapper.getPubsubMessage().getSerializedSize();
long messageSize = messageWrapper.getPubsubMessage().getData().size();
Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");

// Call all span start/end methods in the expected order
Expand All @@ -80,7 +80,8 @@ public void testPublishSpansSuccess() {
messageWrapper.startPublishBatchingSpan(tracer);
messageWrapper.endPublishBatchingSpan();
Span publishRpcSpan =
OpenTelemetryUtil.startPublishRpcSpan(tracer, FULL_TOPIC_NAME, messageWrappers, true);
OpenTelemetryUtil.startPublishRpcSpan(
tracer, FULL_TOPIC_NAME.toString(), messageWrappers, true);
OpenTelemetryUtil.endPublishRpcSpan(publishRpcSpan, true);
messageWrapper.setPublisherMessageIdSpanAttribute(MESSAGE_ID);
messageWrapper.endPublisherSpan();
Expand Down Expand Up @@ -265,7 +266,8 @@ public void testPublishRpcSpanFailure() {

messageWrapper.startPublisherSpan(tracer);
Span publishRpcSpan =
OpenTelemetryUtil.startPublishRpcSpan(tracer, FULL_TOPIC_NAME, messageWrappers, true);
OpenTelemetryUtil.startPublishRpcSpan(
tracer, FULL_TOPIC_NAME.toString(), messageWrappers, true);

Exception e = new Exception("test-exception");
OpenTelemetryUtil.setPublishRpcSpanException(publishRpcSpan, e, true);
Expand Down