diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index 66bc6c0ba..eea94cc60 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -25,6 +25,7 @@ import com.google.cloud.ServiceOptions; import com.google.cloud.datastore.execution.AggregationQueryExecutor; import com.google.cloud.datastore.spi.v1.DatastoreRpc; +import com.google.cloud.datastore.telemetry.TraceUtil.Context; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.AbstractIterator; @@ -52,6 +53,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import javax.annotation.Nonnull; final class DatastoreImpl extends BaseService implements Datastore { @@ -105,7 +107,10 @@ static class ReadWriteTransactionCallable implements Callable { private volatile Transaction transaction; ReadWriteTransactionCallable( - Datastore datastore, TransactionCallable callable, TransactionOptions options) { + Datastore datastore, + TransactionCallable callable, + TransactionOptions options, + @Nonnull Context parentTraceContext) { this.datastore = datastore; this.callable = callable; this.options = options; @@ -132,8 +137,14 @@ void setPrevTransactionId(ByteString transactionId) { @Override public T call() throws DatastoreException { - transaction = datastore.newTransaction(options); - try { + com.google.cloud.datastore.telemetry.TraceUtil traceUtil = + datastore.getOptions().getTraceUtil(); + com.google.cloud.datastore.telemetry.TraceUtil.Span span = + traceUtil.startSpan( + com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN, + datastore.getOptions().getTraceUtil().getCurrentContext()); + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { + transaction = datastore.newTransaction(options); T value = callable.run(transaction); transaction.commit(); return value; @@ -154,36 +165,42 @@ public T call() throws DatastoreException { @Override public T runInTransaction(final TransactionCallable callable) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_TRANSACTION); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + com.google.cloud.datastore.telemetry.TraceUtil.Span span = + otelTraceUtil.startSpan( + com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN); + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( - new ReadWriteTransactionCallable(this, callable, null), + new ReadWriteTransactionCallable( + this, callable, null, otelTraceUtil.getCurrentContext()), retrySettings, TRANSACTION_EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } @Override public T runInTransaction( final TransactionCallable callable, TransactionOptions transactionOptions) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_TRANSACTION); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + com.google.cloud.datastore.telemetry.TraceUtil.Span span = + otelTraceUtil.startSpan( + com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN); + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( - new ReadWriteTransactionCallable(this, callable, transactionOptions), + new ReadWriteTransactionCallable( + this, callable, transactionOptions, otelTraceUtil.getCurrentContext()), retrySettings, TRANSACTION_EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } @@ -634,10 +651,14 @@ private com.google.datastore.v1.CommitResponse commitMutation( com.google.datastore.v1.CommitResponse commit( final com.google.datastore.v1.CommitRequest requestPb) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT); - span.setAttribute("isTransactional", requestPb.hasTransaction()); - + final boolean isTransactional = + requestPb.hasTransaction() || requestPb.hasSingleUseTransaction(); + final String spanName = + isTransactional + ? com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT + : com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT; + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); + span.setAttribute("isTransactional", isTransactional); try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( () -> datastoreRpc.commit(requestPb), @@ -663,7 +684,8 @@ com.google.datastore.v1.BeginTransactionResponse beginTransaction( final com.google.datastore.v1.BeginTransactionRequest requestPb) { com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan( - com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_BEGIN_TRANSACTION); + com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_BEGIN_TRANSACTION, + otelTraceUtil.getCurrentContext()); try (com.google.cloud.datastore.telemetry.TraceUtil.Scope scope = span.makeCurrent()) { return RetryHelper.runWithRetries( () -> datastoreRpc.beginTransaction(requestPb), diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TransactionImpl.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TransactionImpl.java index f08a908ec..e730db81f 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TransactionImpl.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TransactionImpl.java @@ -20,6 +20,7 @@ import com.google.api.core.BetaApi; import com.google.cloud.datastore.models.ExplainOptions; +import com.google.cloud.datastore.telemetry.TraceUtil; import com.google.common.collect.ImmutableList; import com.google.datastore.v1.ReadOptions; import com.google.datastore.v1.TransactionOptions; @@ -28,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import javax.annotation.Nonnull; final class TransactionImpl extends BaseDatastoreBatchWriter implements Transaction { @@ -37,6 +39,8 @@ final class TransactionImpl extends BaseDatastoreBatchWriter implements Transact private final ReadOptionProtoPreparer readOptionProtoPreparer; + @Nonnull private final TraceUtil traceUtil; + static class ResponseImpl implements Transaction.Response { private final com.google.datastore.v1.CommitResponse response; @@ -78,6 +82,7 @@ public List getGeneratedKeys() { transactionId = datastore.requestTransactionId(requestPb); this.readOptionProtoPreparer = new ReadOptionProtoPreparer(); + this.traceUtil = datastore.getOptions().getTraceUtil(); } @Override diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DisabledTraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DisabledTraceUtil.java index a4f25813a..2a42081a1 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DisabledTraceUtil.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DisabledTraceUtil.java @@ -102,13 +102,13 @@ public TraceUtil.Span startSpan(String spanName, TraceUtil.Context parent) { @Nonnull @Override - public TraceUtil.Span currentSpan() { + public TraceUtil.Span getCurrentSpan() { return new Span(); } @Nonnull @Override - public TraceUtil.Context currentContext() { + public TraceUtil.Context getCurrentContext() { return new Context(); } } diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java index 3bf6a7466..50f89369a 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java @@ -300,13 +300,13 @@ public TraceUtil.Span startSpan(String spanName, TraceUtil.Context parent) { @Nonnull @Override - public TraceUtil.Span currentSpan() { + public TraceUtil.Span getCurrentSpan() { return new Span(io.opentelemetry.api.trace.Span.current(), ""); } @Nonnull @Override - public TraceUtil.Context currentContext() { + public TraceUtil.Context getCurrentContext() { return new Context(io.opentelemetry.context.Context.current()); } } diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java index d43c36a07..8f45e2b62 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java @@ -35,6 +35,7 @@ public interface TraceUtil { static final String SPAN_NAME_COMMIT = "Commit"; static final String SPAN_NAME_RUN_QUERY = "RunQuery"; static final String SPAN_NAME_RUN_AGGREGATION_QUERY = "RunAggregationQuery"; + static final String SPAN_NAME_TRANSACTION_RUN = "Transaction.run"; static final String SPAN_NAME_BEGIN_TRANSACTION = "Transaction.Begin"; static final String SPAN_NAME_TRANSACTION_LOOKUP = "Transaction.Lookup"; static final String SPAN_NAME_TRANSACTION_COMMIT = "Transaction.Commit"; @@ -133,9 +134,9 @@ interface Scope extends AutoCloseable { /** Returns the current span. */ @Nonnull - Span currentSpan(); + Span getCurrentSpan(); /** Returns the current Context. */ @Nonnull - Context currentContext(); + Context getCurrentContext(); } diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java index 67df8c44b..d2cffc5db 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java @@ -22,7 +22,9 @@ import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_LOOKUP; import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY; import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT; import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_LOOKUP; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN; import static com.google.common.truth.Truth.assertThat; import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME; import static org.junit.Assert.assertEquals; @@ -749,6 +751,7 @@ public void transactionalLookupTest() throws Exception { try (Scope ignored = rootSpan.makeCurrent()) { Transaction transaction = datastore.newTransaction(); Entity entity = datastore.get(KEY1, ReadOption.transactionId(transaction.getTransactionId())); + transaction.commit(); assertNull(entity); } finally { rootSpan.end(); @@ -757,18 +760,55 @@ public void transactionalLookupTest() throws Exception { fetchAndValidateTrace( customSpanContext.getTraceId(), - /*numExpectedSpans=*/ 2, - Collections.singletonList(Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION))); + /*numExpectedSpans=*/ 3, + Arrays.asList( + Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION), + Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP), + Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT))); + } + + @Test + public void runInTransactionQueryTest() throws Exception { + Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + assertNotNull(customSpanContext); + + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field")); + Query query = + Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build(); + Datastore.TransactionCallable callable = + transaction -> { + QueryResults queryResults = datastore.run(query); + assertTrue(queryResults.hasNext()); + assertEquals(entity1, queryResults.next()); + assertFalse(queryResults.hasNext()); + return true; + }; + datastore.runInTransaction(callable); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); fetchAndValidateTrace( customSpanContext.getTraceId(), - /*numExpectedSpans=*/ 2, - Collections.singletonList(Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP))); + /*numExpectedSpans=*/ 4, + Arrays.asList( + Collections.singletonList(SPAN_NAME_TRANSACTION_RUN), + Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION), + Collections.singletonList(SPAN_NAME_RUN_QUERY), + Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT))); } - @Test - public void runInTransactionQueryTest() throws Exception {} - @Test public void runInTransactionAggregationQueryTest() throws Exception {} diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DisabledTraceUtilTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DisabledTraceUtilTest.java index 0f3f183cd..a24f55597 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DisabledTraceUtilTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DisabledTraceUtilTest.java @@ -29,16 +29,16 @@ public void disabledTraceUtilDoesNotProvideChannelConfigurator() { @Test public void usesDisabledContext() { DisabledTraceUtil traceUtil = new DisabledTraceUtil(); - assertThat(traceUtil.currentContext() instanceof DisabledTraceUtil.Context).isTrue(); + assertThat(traceUtil.getCurrentContext() instanceof DisabledTraceUtil.Context).isTrue(); } @Test public void usesDisabledSpan() { DisabledTraceUtil traceUtil = new DisabledTraceUtil(); - assertThat(traceUtil.currentSpan() instanceof DisabledTraceUtil.Span).isTrue(); + assertThat(traceUtil.getCurrentSpan() instanceof DisabledTraceUtil.Span).isTrue(); assertThat(traceUtil.startSpan("foo") instanceof DisabledTraceUtil.Span).isTrue(); assertThat( - traceUtil.startSpan("foo", traceUtil.currentContext()) + traceUtil.startSpan("foo", traceUtil.getCurrentContext()) instanceof DisabledTraceUtil.Span) .isTrue(); } @@ -46,8 +46,9 @@ public void usesDisabledSpan() { @Test public void usesDisabledScope() { DisabledTraceUtil traceUtil = new DisabledTraceUtil(); - assertThat(traceUtil.currentContext().makeCurrent() instanceof DisabledTraceUtil.Scope) + assertThat(traceUtil.getCurrentContext().makeCurrent() instanceof DisabledTraceUtil.Scope) + .isTrue(); + assertThat(traceUtil.getCurrentSpan().makeCurrent() instanceof DisabledTraceUtil.Scope) .isTrue(); - assertThat(traceUtil.currentSpan().makeCurrent() instanceof DisabledTraceUtil.Scope).isTrue(); } } diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java index e88e1a849..2497672d9 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java @@ -82,23 +82,26 @@ public void enabledTraceUtilProvidesChannelConfigurator() { @Test public void usesEnabledContext() { - assertThat(newEnabledTraceUtil().currentContext() instanceof EnabledTraceUtil.Context).isTrue(); + assertThat(newEnabledTraceUtil().getCurrentContext() instanceof EnabledTraceUtil.Context) + .isTrue(); } @Test public void usesEnabledSpan() { EnabledTraceUtil traceUtil = newEnabledTraceUtil(); - assertThat(traceUtil.currentSpan() instanceof EnabledTraceUtil.Span).isTrue(); + assertThat(traceUtil.getCurrentSpan() instanceof EnabledTraceUtil.Span).isTrue(); assertThat(traceUtil.startSpan("foo") != null).isTrue(); assertThat( - traceUtil.startSpan("foo", traceUtil.currentContext()) instanceof EnabledTraceUtil.Span) + traceUtil.startSpan("foo", traceUtil.getCurrentContext()) + instanceof EnabledTraceUtil.Span) .isTrue(); } @Test public void usesEnabledScope() { EnabledTraceUtil traceUtil = newEnabledTraceUtil(); - assertThat(traceUtil.currentContext().makeCurrent() instanceof EnabledTraceUtil.Scope).isTrue(); - assertThat(traceUtil.currentSpan().makeCurrent() instanceof EnabledTraceUtil.Scope).isTrue(); + assertThat(traceUtil.getCurrentContext().makeCurrent() instanceof EnabledTraceUtil.Scope) + .isTrue(); + assertThat(traceUtil.getCurrentSpan().makeCurrent() instanceof EnabledTraceUtil.Scope).isTrue(); } }