Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.datastore.execution.AggregationQueryExecutor;
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.telemetry.TraceUtil.Context;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
Expand Down Expand Up @@ -52,6 +53,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nonnull;

final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datastore {

Expand Down Expand Up @@ -105,7 +107,10 @@ static class ReadWriteTransactionCallable<T> implements Callable<T> {
private volatile Transaction transaction;

ReadWriteTransactionCallable(
Datastore datastore, TransactionCallable<T> callable, TransactionOptions options) {
Datastore datastore,
TransactionCallable<T> callable,
TransactionOptions options,
@Nonnull Context parentTraceContext) {
this.datastore = datastore;
this.callable = callable;
this.options = options;
Expand All @@ -132,8 +137,14 @@ void setPrevTransactionId(ByteString transactionId) {

@Override
public T call() throws DatastoreException {
transaction = datastore.newTransaction(options);
try {
com.google.cloud.datastore.telemetry.TraceUtil traceUtil =
datastore.getOptions().getTraceUtil();
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
traceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN,
datastore.getOptions().getTraceUtil().getCurrentContext());
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
transaction = datastore.newTransaction(options);
T value = callable.run(transaction);
transaction.commit();
return value;
Expand All @@ -154,36 +165,42 @@ public T call() throws DatastoreException {

@Override
public <T> T runInTransaction(final TransactionCallable<T> callable) {
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_TRANSACTION);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(this, callable, null),
new ReadWriteTransactionCallable<T>(
this, callable, null, otelTraceUtil.getCurrentContext()),
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.setStatus(Status.UNKNOWN.withDescription(e.getMessage()));
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
}
}

@Override
public <T> T runInTransaction(
final TransactionCallable<T> callable, TransactionOptions transactionOptions) {
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_TRANSACTION);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(this, callable, transactionOptions),
new ReadWriteTransactionCallable<T>(
this, callable, transactionOptions, otelTraceUtil.getCurrentContext()),
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.setStatus(Status.UNKNOWN.withDescription(e.getMessage()));
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
}
}

Expand Down Expand Up @@ -634,10 +651,14 @@ private com.google.datastore.v1.CommitResponse commitMutation(

com.google.datastore.v1.CommitResponse commit(
final com.google.datastore.v1.CommitRequest requestPb) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT);
span.setAttribute("isTransactional", requestPb.hasTransaction());

final boolean isTransactional =
requestPb.hasTransaction() || requestPb.hasSingleUseTransaction();
final String spanName =
isTransactional
? com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT
: com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT;
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName);
span.setAttribute("isTransactional", isTransactional);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
() -> datastoreRpc.commit(requestPb),
Expand All @@ -663,7 +684,8 @@ com.google.datastore.v1.BeginTransactionResponse beginTransaction(
final com.google.datastore.v1.BeginTransactionRequest requestPb) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_BEGIN_TRANSACTION);
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_BEGIN_TRANSACTION,
otelTraceUtil.getCurrentContext());
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope scope = span.makeCurrent()) {
return RetryHelper.runWithRetries(
() -> datastoreRpc.beginTransaction(requestPb),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.api.core.BetaApi;
import com.google.cloud.datastore.models.ExplainOptions;
import com.google.cloud.datastore.telemetry.TraceUtil;
import com.google.common.collect.ImmutableList;
import com.google.datastore.v1.ReadOptions;
import com.google.datastore.v1.TransactionOptions;
Expand All @@ -28,6 +29,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;

final class TransactionImpl extends BaseDatastoreBatchWriter implements Transaction {

Expand All @@ -37,6 +39,8 @@ final class TransactionImpl extends BaseDatastoreBatchWriter implements Transact

private final ReadOptionProtoPreparer readOptionProtoPreparer;

@Nonnull private final TraceUtil traceUtil;

static class ResponseImpl implements Transaction.Response {

private final com.google.datastore.v1.CommitResponse response;
Expand Down Expand Up @@ -78,6 +82,7 @@ public List<Key> getGeneratedKeys() {

transactionId = datastore.requestTransactionId(requestPb);
this.readOptionProtoPreparer = new ReadOptionProtoPreparer();
this.traceUtil = datastore.getOptions().getTraceUtil();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ public TraceUtil.Span startSpan(String spanName, TraceUtil.Context parent) {

@Nonnull
@Override
public TraceUtil.Span currentSpan() {
public TraceUtil.Span getCurrentSpan() {
return new Span();
}

@Nonnull
@Override
public TraceUtil.Context currentContext() {
public TraceUtil.Context getCurrentContext() {
return new Context();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,13 @@ public TraceUtil.Span startSpan(String spanName, TraceUtil.Context parent) {

@Nonnull
@Override
public TraceUtil.Span currentSpan() {
public TraceUtil.Span getCurrentSpan() {
return new Span(io.opentelemetry.api.trace.Span.current(), "");
}

@Nonnull
@Override
public TraceUtil.Context currentContext() {
public TraceUtil.Context getCurrentContext() {
return new Context(io.opentelemetry.context.Context.current());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public interface TraceUtil {
static final String SPAN_NAME_COMMIT = "Commit";
static final String SPAN_NAME_RUN_QUERY = "RunQuery";
static final String SPAN_NAME_RUN_AGGREGATION_QUERY = "RunAggregationQuery";
static final String SPAN_NAME_TRANSACTION_RUN = "Transaction.run";
static final String SPAN_NAME_BEGIN_TRANSACTION = "Transaction.Begin";
static final String SPAN_NAME_TRANSACTION_LOOKUP = "Transaction.Lookup";
static final String SPAN_NAME_TRANSACTION_COMMIT = "Transaction.Commit";
Expand Down Expand Up @@ -133,9 +134,9 @@ interface Scope extends AutoCloseable {

/** Returns the current span. */
@Nonnull
Span currentSpan();
Span getCurrentSpan();

/** Returns the current Context. */
@Nonnull
Context currentContext();
Context getCurrentContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_LOOKUP;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_LOOKUP;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN;
import static com.google.common.truth.Truth.assertThat;
import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -749,6 +751,7 @@ public void transactionalLookupTest() throws Exception {
try (Scope ignored = rootSpan.makeCurrent()) {
Transaction transaction = datastore.newTransaction();
Entity entity = datastore.get(KEY1, ReadOption.transactionId(transaction.getTransactionId()));
transaction.commit();
assertNull(entity);
} finally {
rootSpan.end();
Expand All @@ -757,18 +760,55 @@ public void transactionalLookupTest() throws Exception {

fetchAndValidateTrace(
customSpanContext.getTraceId(),
/*numExpectedSpans=*/ 2,
Collections.singletonList(Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION)));
/*numExpectedSpans=*/ 3,
Arrays.asList(
Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION),
Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP),
Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT)));
}

@Test
public void runInTransactionQueryTest() throws Exception {
Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build();
Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build();
List<Entity> entityList = new ArrayList<>();
entityList.add(entity1);
entityList.add(entity2);

List<Entity> response = datastore.add(entity1, entity2);
assertEquals(entityList, response);

assertNotNull(customSpanContext);

Span rootSpan = getNewRootSpanWithContext();
try (Scope ignored = rootSpan.makeCurrent()) {
PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field"));
Query<Entity> query =
Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build();
Datastore.TransactionCallable<Boolean> callable =
transaction -> {
QueryResults<Entity> queryResults = datastore.run(query);
assertTrue(queryResults.hasNext());
assertEquals(entity1, queryResults.next());
assertFalse(queryResults.hasNext());
return true;
};
datastore.runInTransaction(callable);
} finally {
rootSpan.end();
}
waitForTracesToComplete();

fetchAndValidateTrace(
customSpanContext.getTraceId(),
/*numExpectedSpans=*/ 2,
Collections.singletonList(Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP)));
/*numExpectedSpans=*/ 4,
Arrays.asList(
Collections.singletonList(SPAN_NAME_TRANSACTION_RUN),
Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION),
Collections.singletonList(SPAN_NAME_RUN_QUERY),
Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT)));
}

@Test
public void runInTransactionQueryTest() throws Exception {}

@Test
public void runInTransactionAggregationQueryTest() throws Exception {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,26 @@ public void disabledTraceUtilDoesNotProvideChannelConfigurator() {
@Test
public void usesDisabledContext() {
DisabledTraceUtil traceUtil = new DisabledTraceUtil();
assertThat(traceUtil.currentContext() instanceof DisabledTraceUtil.Context).isTrue();
assertThat(traceUtil.getCurrentContext() instanceof DisabledTraceUtil.Context).isTrue();
}

@Test
public void usesDisabledSpan() {
DisabledTraceUtil traceUtil = new DisabledTraceUtil();
assertThat(traceUtil.currentSpan() instanceof DisabledTraceUtil.Span).isTrue();
assertThat(traceUtil.getCurrentSpan() instanceof DisabledTraceUtil.Span).isTrue();
assertThat(traceUtil.startSpan("foo") instanceof DisabledTraceUtil.Span).isTrue();
assertThat(
traceUtil.startSpan("foo", traceUtil.currentContext())
traceUtil.startSpan("foo", traceUtil.getCurrentContext())
instanceof DisabledTraceUtil.Span)
.isTrue();
}

@Test
public void usesDisabledScope() {
DisabledTraceUtil traceUtil = new DisabledTraceUtil();
assertThat(traceUtil.currentContext().makeCurrent() instanceof DisabledTraceUtil.Scope)
assertThat(traceUtil.getCurrentContext().makeCurrent() instanceof DisabledTraceUtil.Scope)
.isTrue();
assertThat(traceUtil.getCurrentSpan().makeCurrent() instanceof DisabledTraceUtil.Scope)
.isTrue();
assertThat(traceUtil.currentSpan().makeCurrent() instanceof DisabledTraceUtil.Scope).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,26 @@ public void enabledTraceUtilProvidesChannelConfigurator() {

@Test
public void usesEnabledContext() {
assertThat(newEnabledTraceUtil().currentContext() instanceof EnabledTraceUtil.Context).isTrue();
assertThat(newEnabledTraceUtil().getCurrentContext() instanceof EnabledTraceUtil.Context)
.isTrue();
}

@Test
public void usesEnabledSpan() {
EnabledTraceUtil traceUtil = newEnabledTraceUtil();
assertThat(traceUtil.currentSpan() instanceof EnabledTraceUtil.Span).isTrue();
assertThat(traceUtil.getCurrentSpan() instanceof EnabledTraceUtil.Span).isTrue();
assertThat(traceUtil.startSpan("foo") != null).isTrue();
assertThat(
traceUtil.startSpan("foo", traceUtil.currentContext()) instanceof EnabledTraceUtil.Span)
traceUtil.startSpan("foo", traceUtil.getCurrentContext())
instanceof EnabledTraceUtil.Span)
.isTrue();
}

@Test
public void usesEnabledScope() {
EnabledTraceUtil traceUtil = newEnabledTraceUtil();
assertThat(traceUtil.currentContext().makeCurrent() instanceof EnabledTraceUtil.Scope).isTrue();
assertThat(traceUtil.currentSpan().makeCurrent() instanceof EnabledTraceUtil.Scope).isTrue();
assertThat(traceUtil.getCurrentContext().makeCurrent() instanceof EnabledTraceUtil.Scope)
.isTrue();
assertThat(traceUtil.getCurrentSpan().makeCurrent() instanceof EnabledTraceUtil.Scope).isTrue();
}
}