diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java index c4a85caab..ba4adb5ff 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java @@ -16,7 +16,6 @@ package com.google.cloud.datastore; import static com.google.cloud.BaseService.EXCEPTION_HANDLER; -import static com.google.cloud.datastore.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY; import com.google.api.core.InternalApi; import com.google.api.gax.retrying.RetrySettings; @@ -39,9 +38,6 @@ import com.google.datastore.v1.RunAggregationQueryResponse; import com.google.datastore.v1.RunQueryRequest; import com.google.datastore.v1.RunQueryResponse; -import io.opencensus.common.Scope; -import io.opencensus.trace.Span; -import io.opencensus.trace.Status; import java.util.concurrent.Callable; /** @@ -52,7 +48,7 @@ public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc { private final DatastoreRpc datastoreRpc; - private final TraceUtil traceUtil; + private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil; private final RetrySettings retrySettings; private final DatastoreOptions datastoreOptions; @@ -62,9 +58,9 @@ public RetryAndTraceDatastoreRpcDecorator( RetrySettings retrySettings, DatastoreOptions datastoreOptions) { this.datastoreRpc = datastoreRpc; - this.traceUtil = traceUtil; this.retrySettings = retrySettings; this.datastoreOptions = datastoreOptions; + this.otelTraceUtil = datastoreOptions.getTraceUtil(); } @Override @@ -106,19 +102,20 @@ public RunQueryResponse runQuery(RunQueryRequest request) { @Override public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryRequest request) { return invokeRpc( - () -> datastoreRpc.runAggregationQuery(request), SPAN_NAME_RUN_AGGREGATION_QUERY); + () -> datastoreRpc.runAggregationQuery(request), + com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY); } public O invokeRpc(Callable block, String startSpan) { - Span span = traceUtil.startSpan(startSpan); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(startSpan); + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( block, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } } diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java index ccb5cfe7a..00e9f2376 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java @@ -34,6 +34,7 @@ public interface TraceUtil { static final String SPAN_NAME_LOOKUP = "Lookup"; static final String SPAN_NAME_COMMIT = "Commit"; static final String SPAN_NAME_RUN_QUERY = "RunQuery"; + static final String SPAN_NAME_RUN_AGGREGATION_QUERY = "RunAggregationQuery"; /** * Creates and returns an instance of the TraceUtil class. diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecoratorTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecoratorTest.java index b86355afa..b01bcab82 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecoratorTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecoratorTest.java @@ -15,11 +15,8 @@ */ package com.google.cloud.datastore; -import static com.google.cloud.datastore.TraceUtil.END_SPAN_OPTIONS; -import static com.google.cloud.datastore.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY; import static com.google.common.truth.Truth.assertThat; import static com.google.rpc.Code.UNAVAILABLE; -import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.createStrictMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; @@ -29,8 +26,6 @@ import com.google.cloud.datastore.spi.v1.DatastoreRpc; import com.google.datastore.v1.RunAggregationQueryRequest; import com.google.datastore.v1.RunAggregationQueryResponse; -import io.opencensus.trace.Span; -import io.opencensus.trace.Tracer; import org.junit.Before; import org.junit.Test; @@ -49,7 +44,6 @@ public class RetryAndTraceDatastoreRpcDecoratorTest { @Before public void setUp() throws Exception { mockDatastoreRpc = createStrictMock(DatastoreRpc.class); - mockTraceUtil = createStrictMock(TraceUtil.class); datastoreRpcDecorator = new RetryAndTraceDatastoreRpcDecorator( mockDatastoreRpc, mockTraceUtil, retrySettings, datastoreOptions); @@ -57,7 +51,6 @@ public void setUp() throws Exception { @Test public void testRunAggregationQuery() { - Span mockSpan = createStrictMock(Span.class); RunAggregationQueryRequest aggregationQueryRequest = RunAggregationQueryRequest.getDefaultInstance(); RunAggregationQueryResponse aggregationQueryResponse = @@ -69,16 +62,13 @@ public void testRunAggregationQuery() { UNAVAILABLE.getNumber(), "API not accessible currently", UNAVAILABLE.name())) .times(2) .andReturn(aggregationQueryResponse); - expect(mockTraceUtil.startSpan(SPAN_NAME_RUN_AGGREGATION_QUERY)).andReturn(mockSpan); - expect(mockTraceUtil.getTracer()).andReturn(createNiceMock(Tracer.class)); - mockSpan.end(END_SPAN_OPTIONS); - replay(mockDatastoreRpc, mockTraceUtil, mockSpan); + replay(mockDatastoreRpc); RunAggregationQueryResponse actualAggregationQueryResponse = datastoreRpcDecorator.runAggregationQuery(aggregationQueryRequest); assertThat(actualAggregationQueryResponse).isSameInstanceAs(aggregationQueryResponse); - verify(mockDatastoreRpc, mockTraceUtil, mockSpan); + verify(mockDatastoreRpc); } } diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java index 0212b5e42..357e748f1 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java @@ -16,9 +16,12 @@ package com.google.cloud.datastore.it; +import static com.google.cloud.datastore.aggregation.Aggregation.count; import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT; import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_LOOKUP; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY; import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY; +import static com.google.common.truth.Truth.assertThat; import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -27,12 +30,16 @@ import static org.junit.Assert.assertTrue; import com.google.api.gax.rpc.NotFoundException; +import com.google.cloud.datastore.AggregationQuery; +import com.google.cloud.datastore.AggregationResult; +import com.google.cloud.datastore.AggregationResults; import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.DatastoreOptions; import com.google.cloud.datastore.Entity; import com.google.cloud.datastore.Key; import com.google.cloud.datastore.Query; import com.google.cloud.datastore.QueryResults; +import com.google.cloud.datastore.StructuredQuery; import com.google.cloud.datastore.StructuredQuery.PropertyFilter; import com.google.cloud.datastore.testing.RemoteDatastoreHelper; import com.google.cloud.opentelemetry.trace.TraceConfiguration; @@ -95,6 +102,7 @@ // 5. Traces are read-back using TraceServiceClient and verified against expected Call Stacks. @RunWith(TestParameterInjector.class) public class ITE2ETracingTest { + protected boolean isUsingGlobalOpenTelemetrySDK() { return useGlobalOpenTelemetrySDK; } @@ -214,6 +222,10 @@ private boolean dfsContainsCallStack(long spanId, List expectedCallStack private static Key KEY2; + private static Key KEY3; + + private static Key KEY4; + // Random int generator for trace ID and span ID private static Random random; @@ -309,10 +321,17 @@ public void before() throws Exception { .setNamespace(options.getNamespace()) .build(); KEY2 = + Key.newBuilder(projectId, kind1, "key3", options.getDatabaseId()) + .setNamespace(options.getNamespace()) + .build(); + KEY3 = + Key.newBuilder(projectId, kind1, "key4", options.getDatabaseId()) + .setNamespace(options.getNamespace()) + .build(); + KEY4 = Key.newBuilder(projectId, kind1, "key2", options.getDatabaseId()) .setNamespace(options.getNamespace()) .build(); - // Set up the tracer for custom TraceID injection rootSpanName = String.format("%s%d", this.getClass().getSimpleName(), System.currentTimeMillis()); @@ -658,4 +677,62 @@ public void runQueryTraceTest() throws Exception { fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_RUN_QUERY); } + + @Test + public void runAggregationQueryTraceTest() throws Exception { + Entity entity1 = + Entity.newBuilder(KEY1) + .set("pepper_name", "jalapeno") + .set("max_scoville_level", 10000) + .build(); + Entity entity2 = + Entity.newBuilder(KEY2) + .set("pepper_name", "serrano") + .set("max_scoville_level", 25000) + .build(); + Entity entity3 = + Entity.newBuilder(KEY3) + .set("pepper_name", "habanero") + .set("max_scoville_level", 350000) + .build(); + Entity entity4 = + Entity.newBuilder(KEY4) + .set("pepper_name", "ghost") + .set("max_scoville_level", 1500000) + .build(); + + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + entityList.add(entity3); + entityList.add(entity4); + + List response = datastore.add(entity1, entity2, entity3, entity4); + assertEquals(entityList, response); + + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + PropertyFilter mediumSpicyFilters = PropertyFilter.lt("max_scoville_level", 100000); + StructuredQuery mediumSpicyQuery = + Query.newEntityQueryBuilder() + .setKind(KEY1.getKind()) + .setFilter(mediumSpicyFilters) + .build(); + AggregationQuery countSpicyPeppers = + Query.newAggregationQueryBuilder() + .addAggregation(count().as("count")) + .over(mediumSpicyQuery) + .build(); + AggregationResults results = datastore.runAggregation(countSpicyPeppers); + assertThat(results.size()).isEqualTo(1); + AggregationResult result = results.get(0); + assertThat(result.getLong("count")).isEqualTo(2L); + } finally { + rootSpan.end(); + } + + waitForTracesToComplete(); + + fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_RUN_AGGREGATION_QUERY); + } }