diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml index f6411f9ee3..e8e7f3ab0a 100644 --- a/google-cloud-bigtable/clirr-ignored-differences.xml +++ b/google-cloud-bigtable/clirr-ignored-differences.xml @@ -156,6 +156,12 @@ 8001 com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedUnaryCallable + + + 7004 + com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory + * + 6001 diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml index c95d989edc..78e4b55776 100644 --- a/google-cloud-bigtable/pom.xml +++ b/google-cloud-bigtable/pom.xml @@ -64,19 +64,6 @@ - - com.google.cloud - google-cloud-bigtable-stats - - - - io.opencensus - * - - - com.google.api @@ -339,6 +326,10 @@ io.opentelemetry opentelemetry-api + + io.opentelemetry + opentelemetry-sdk + io.opentelemetry opentelemetry-sdk-metrics @@ -347,6 +338,11 @@ io.opentelemetry opentelemetry-sdk-common + + io.opentelemetry + opentelemetry-sdk-testing + test + com.google.cloud google-cloud-monitoring diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java index 701a5e8e49..928159aa6d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java @@ -25,19 +25,16 @@ import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.auth.Credentials; -import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; -import com.google.cloud.bigtable.stats.BigtableStackdriverStatsExporter; -import com.google.cloud.bigtable.stats.BuiltinViews; +import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsProvider; import com.google.common.base.MoreObjects; import com.google.common.base.Strings; import io.grpc.ManagedChannelBuilder; import java.io.IOException; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -77,7 +74,10 @@ public final class BigtableDataSettings { private static final Logger LOGGER = Logger.getLogger(BigtableDataSettings.class.getName()); private static final String BIGTABLE_EMULATOR_HOST_ENV_VAR = "BIGTABLE_EMULATOR_HOST"; - private static final AtomicBoolean BUILTIN_METRICS_REGISTERED = new AtomicBoolean(false); + // This is the legacy credential override used in the deprecated enableBuiltinMetrics method to + // override the default credentials set on the Bigtable client. Keeping it for backward + // compatibility. + @Deprecated @Nullable private static Credentials legacyMetricCredentialOverride; private final EnhancedBigtableStubSettings stubSettings; @@ -197,23 +197,34 @@ public static void enableGfeOpenCensusStats() { com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientGfeViews(); } - /** Register built in metrics. */ - public static void enableBuiltinMetrics() throws IOException { - if (BUILTIN_METRICS_REGISTERED.compareAndSet(false, true)) { - BuiltinViews.registerBigtableBuiltinViews(); - BigtableStackdriverStatsExporter.register(GoogleCredentials.getApplicationDefault()); - } - } + /** + * Register built in metrics. + * + * @deprecated This is a no-op that doesn't do anything. Builtin metrics are enabled by default + * now. Please refer to {@link + * BigtableDataSettings.Builder#setMetricsProvider(MetricsProvider)} on how to enable or + * disable built-in metrics. + */ + @Deprecated + public static void enableBuiltinMetrics() throws IOException {} /** * Register built in metrics with credentials. The credentials need to have metric write access * for all the projects you're publishing to. + * + * @deprecated This is a no-op that doesn't do anything. Builtin metrics are enabled by default + * now. Please refer {@link BigtableDataSettings.Builder#setMetricsProvider(MetricsProvider)} + * on how to enable or disable built-in metrics. */ + @Deprecated public static void enableBuiltinMetrics(Credentials credentials) throws IOException { - if (BUILTIN_METRICS_REGISTERED.compareAndSet(false, true)) { - BuiltinViews.registerBigtableBuiltinViews(); - BigtableStackdriverStatsExporter.register(credentials); - } + BigtableDataSettings.legacyMetricCredentialOverride = credentials; + } + + /** Get the metrics credentials if it's set by {@link #enableBuiltinMetrics(Credentials)}. */ + @InternalApi + public static Credentials getMetricsCredentials() { + return legacyMetricCredentialOverride; } /** Returns the target project id. */ @@ -278,6 +289,11 @@ public boolean isBulkMutationFlowControlEnabled() { return stubSettings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled(); } + /** Gets the {@link MetricsProvider}. * */ + public MetricsProvider getMetricsProvider() { + return stubSettings.getMetricsProvider(); + } + /** Returns the underlying RPC settings. */ public EnhancedBigtableStubSettings getStubSettings() { return stubSettings; @@ -527,6 +543,30 @@ public boolean isBulkMutationFlowControlEnabled() { return stubSettings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled(); } + /** + * Sets the {@link MetricsProvider}. + * + *

By default, this is set to {@link + * com.google.cloud.bigtable.data.v2.stub.metrics.DefaultMetricsProvider#INSTANCE} which will + * collect and export client side metrics. + * + *

To disable client side metrics, set it to {@link + * com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider#INSTANCE}. + * + *

To use a custom OpenTelemetry instance, refer to {@link + * com.google.cloud.bigtable.data.v2.stub.metrics.CustomOpenTelemetryMetricsProvider} on how to + * set it up. + */ + public Builder setMetricsProvider(MetricsProvider metricsProvider) { + stubSettings.setMetricsProvider(metricsProvider); + return this; + } + + /** Gets the {@link MetricsProvider}. */ + public MetricsProvider getMetricsProvider() { + return stubSettings.getMetricsProvider(); + } + /** * Returns the underlying settings for making RPC calls. The settings should be changed with * care. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index ddd69324ca..8b563b0f6f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -15,6 +15,10 @@ */ package com.google.cloud.bigtable.data.v2.stub; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.APP_PROFILE_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.INSTANCE_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.PROJECT_ID_KEY; + import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.api.gax.batching.Batcher; @@ -68,6 +72,7 @@ import com.google.bigtable.v2.SampleRowKeysRequest; import com.google.bigtable.v2.SampleRowKeysResponse; import com.google.cloud.bigtable.Version; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.internal.JwtCredentialsWithAudience; import com.google.cloud.bigtable.data.v2.internal.RequestContext; import com.google.cloud.bigtable.data.v2.models.BulkMutation; @@ -93,8 +98,13 @@ import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory; +import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsView; import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory; +import com.google.cloud.bigtable.data.v2.stub.metrics.CustomOpenTelemetryMetricsProvider; +import com.google.cloud.bigtable.data.v2.stub.metrics.DefaultMetricsProvider; +import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsProvider; import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory; +import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider; import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants; import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable; @@ -123,6 +133,11 @@ import io.opencensus.tags.TagValue; import io.opencensus.tags.Tagger; import io.opencensus.tags.Tags; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -241,7 +256,7 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set } public static ApiTracerFactory createBigtableTracerFactory( - EnhancedBigtableStubSettings settings, ClientContext clientContext) { + EnhancedBigtableStubSettings settings, ClientContext clientContext) throws IOException { return createBigtableTracerFactory( settings, Tags.getTagger(), Stats.getStatsRecorder(), clientContext); } @@ -251,7 +266,8 @@ public static ApiTracerFactory createBigtableTracerFactory( EnhancedBigtableStubSettings settings, Tagger tagger, StatsRecorder stats, - ClientContext clientContext) { + ClientContext clientContext) + throws IOException { String projectId = settings.getProjectId(); String instanceId = settings.getInstanceId(); String appProfileId = settings.getAppProfileId(); @@ -262,16 +278,11 @@ public static ApiTracerFactory createBigtableTracerFactory( .put(RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(instanceId)) .put(RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, TagValue.create(appProfileId)) .build(); - ImmutableMap builtinAttributes = - ImmutableMap.builder() - .put("project_id", projectId) - .put("instance", instanceId) - .put("app_profile", appProfileId) - .build(); ImmutableList.Builder tracerFactories = ImmutableList.builder(); tracerFactories .add( + // Add OpenCensus Tracing new OpencensusTracerFactory( ImmutableMap.builder() // Annotate traces with the same tags as metrics @@ -285,13 +296,47 @@ public static ApiTracerFactory createBigtableTracerFactory( .build())) // Add OpenCensus Metrics .add(MetricsTracerFactory.create(tagger, stats, attributes)) - .add(BuiltinMetricsTracerFactory.create(builtinAttributes)) // Add user configured tracer .add(settings.getTracerFactory()); - + Attributes otelAttributes = + Attributes.of( + PROJECT_ID_KEY, projectId, INSTANCE_ID_KEY, instanceId, APP_PROFILE_KEY, appProfileId); + BuiltinMetricsTracerFactory builtinMetricsTracerFactory = + createBuiltinMetricsTracerFactory( + projectId, settings.getMetricsProvider(), otelAttributes, clientContext); + if (builtinMetricsTracerFactory != null) { + tracerFactories.add(builtinMetricsTracerFactory); + } return new CompositeTracerFactory(tracerFactories.build()); } + private static BuiltinMetricsTracerFactory createBuiltinMetricsTracerFactory( + String projectId, + MetricsProvider metricsProvider, + Attributes attributes, + ClientContext clientContext) + throws IOException { + if (metricsProvider instanceof CustomOpenTelemetryMetricsProvider) { + CustomOpenTelemetryMetricsProvider customMetricsProvider = + (CustomOpenTelemetryMetricsProvider) metricsProvider; + return BuiltinMetricsTracerFactory.create( + customMetricsProvider.getOpenTelemetry(), attributes); + } else if (metricsProvider instanceof DefaultMetricsProvider) { + SdkMeterProviderBuilder meterProvider = SdkMeterProvider.builder(); + Credentials credentials = + BigtableDataSettings.getMetricsCredentials() != null + ? BigtableDataSettings.getMetricsCredentials() + : clientContext.getCredentials(); + BuiltinMetricsView.registerBuiltinMetrics(projectId, credentials, meterProvider); + OpenTelemetry openTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider.build()).build(); + return BuiltinMetricsTracerFactory.create(openTelemetry, attributes); + } else if (metricsProvider instanceof NoopMetricsProvider) { + return null; + } + throw new IOException("Invalid MetricsProvider type " + metricsProvider); + } + private static void patchCredentials(EnhancedBigtableStubSettings.Builder settings) throws IOException { int i = settings.getEndpoint().lastIndexOf(":"); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 44e4752cd5..dd8156d5cd 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -44,6 +44,8 @@ import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.stub.metrics.DefaultMetricsProvider; +import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsProvider; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; import com.google.common.base.MoreObjects; @@ -229,6 +231,8 @@ public class EnhancedBigtableStubSettings extends StubSettings getJwtAudienceMapping() { return jwtAudienceMapping; } + public MetricsProvider getMetricsProvider() { + return metricsProvider; + } + /** * Gets if routing cookie is enabled. If true, client will retry a request with extra metadata * server sent back. @@ -636,6 +645,8 @@ public static class Builder extends StubSettings.Builder jwtAudienceMapping) { return this; } + /** + * Sets the {@link MetricsProvider}. + * + *

By default, this is set to {@link + * com.google.cloud.bigtable.data.v2.stub.metrics.DefaultMetricsProvider#INSTANCE} which will + * collect and export client side metrics. + * + *

To disable client side metrics, set it to {@link + * com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider#INSTANCE}. + * + *

To use a custom OpenTelemetry instance, refer to {@link + * com.google.cloud.bigtable.data.v2.stub.metrics.CustomOpenTelemetryMetricsProvider} on how to + * set it up. + */ + public Builder setMetricsProvider(MetricsProvider metricsProvider) { + this.metricsProvider = Preconditions.checkNotNull(metricsProvider); + return this; + } + + /** Gets the {@link MetricsProvider}. */ + public MetricsProvider getMetricsProvider() { + return this.metricsProvider; + } + @InternalApi("Used for internal testing") public Map getJwtAudienceMapping() { return jwtAudienceMapping; @@ -1082,6 +1120,7 @@ public String toString() { generateInitialChangeStreamPartitionsSettings) .add("readChangeStreamSettings", readChangeStreamSettings) .add("pingAndWarmSettings", pingAndWarmSettings) + .add("metricsProvider", metricsProvider) .add("parent", super.toString()) .toString(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index 6208fce89e..97cc2f73ec 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -86,7 +86,7 @@ public void call( stopwatch.stop(); if (context.getTracer() instanceof BigtableTracer) { ((BigtableTracer) context.getTracer()) - .batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + .batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS)); } RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java index 5ca8271791..019fe88a50 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java @@ -19,11 +19,13 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.core.InternalApi; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.auth.Credentials; import com.google.cloud.monitoring.v3.MetricServiceClient; import com.google.cloud.monitoring.v3.MetricServiceSettings; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; import com.google.common.util.concurrent.MoreExecutors; import com.google.monitoring.v3.CreateTimeSeriesRequest; import com.google.monitoring.v3.ProjectName; @@ -49,10 +51,19 @@ *

The exporter will look for all bigtable owned metrics under bigtable.googleapis.com * instrumentation scope and upload it via the Google Cloud Monitoring API. */ -final class BigtableCloudMonitoringExporter implements MetricExporter { +@InternalApi +public final class BigtableCloudMonitoringExporter implements MetricExporter { private static final Logger logger = Logger.getLogger(BigtableCloudMonitoringExporter.class.getName()); + + // This system property can be used to override the monitoring endpoint + // to a different environment. It's meant for internal testing only. + private static final String MONITORING_ENDPOINT = + MoreObjects.firstNonNull( + System.getProperty("bigtable.test-monitoring-endpoint"), + MetricServiceSettings.getDefaultEndpoint()); + private final MetricServiceClient client; private final String projectId; @@ -64,10 +75,11 @@ final class BigtableCloudMonitoringExporter implements MetricExporter { private CompletableResultCode lastExportCode; - static BigtableCloudMonitoringExporter create(String projectId, Credentials credentials) + public static BigtableCloudMonitoringExporter create(String projectId, Credentials credentials) throws IOException { MetricServiceSettings.Builder settingsBuilder = MetricServiceSettings.newBuilder(); settingsBuilder.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); + settingsBuilder.setEndpoint(MONITORING_ENDPOINT); org.threeten.bp.Duration timeout = Duration.ofMinutes(1); // TODO: createServiceTimeSeries needs special handling if the request failed. Leaving diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableExporterUtils.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableExporterUtils.java index 7c3dc09fc4..e3cf50b070 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableExporterUtils.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableExporterUtils.java @@ -25,12 +25,12 @@ import static com.google.api.MetricDescriptor.ValueType.DISTRIBUTION; import static com.google.api.MetricDescriptor.ValueType.DOUBLE; import static com.google.api.MetricDescriptor.ValueType.INT64; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.CLIENT_UID; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.CLUSTER_ID; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.INSTANCE_ID; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.PROJECT_ID; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.TABLE_ID; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.ZONE_ID; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_UID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLUSTER_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.INSTANCE_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.PROJECT_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TABLE_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ZONE_ID_KEY; import com.google.api.Distribution; import com.google.api.Metric; @@ -70,7 +70,7 @@ class BigtableExporterUtils { // These metric labels will be promoted to the bigtable_table monitored resource fields private static final Set> PROMOTED_RESOURCE_LABELS = - ImmutableSet.of(PROJECT_ID, INSTANCE_ID, TABLE_ID, CLUSTER_ID, ZONE_ID); + ImmutableSet.of(PROJECT_ID_KEY, INSTANCE_ID_KEY, TABLE_ID_KEY, CLUSTER_ID_KEY, ZONE_ID_KEY); private BigtableExporterUtils() {} @@ -96,7 +96,7 @@ static String getDefaultTaskValue() { } static String getProjectId(PointData pointData) { - return pointData.getAttributes().get(PROJECT_ID); + return pointData.getAttributes().get(PROJECT_ID_KEY); } static List convertCollectionToListOfTimeSeries( @@ -104,9 +104,11 @@ static List convertCollectionToListOfTimeSeries( List allTimeSeries = new ArrayList<>(); for (MetricData metricData : collection) { - // TODO: scope will be defined in BuiltinMetricsConstants. Update this field in the following - // PR. - if (!metricData.getInstrumentationScopeInfo().getName().equals("bigtable.googleapis.com")) { + if (!metricData + .getInstrumentationScopeInfo() + .getName() + .equals(BuiltinMetricsConstants.METER_NAME)) { + // Filter out metric data for instruments that are not part of the bigtable builtin metrics continue; } metricData.getData().getPoints().stream() @@ -135,7 +137,7 @@ private static TimeSeries convertPointToTimeSeries( metricBuilder.putLabels(key.getKey(), String.valueOf(attributes.get(key))); } } - metricBuilder.putLabels(CLIENT_UID.getKey(), taskId); + metricBuilder.putLabels(CLIENT_UID_KEY.getKey(), taskId); TimeSeries.Builder builder = TimeSeries.newBuilder() diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java index 1cda49934c..3b2242385a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java @@ -42,7 +42,7 @@ public void streamCreated(Attributes transportAttrs, Metadata headers) { @Override public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { - tracer.grpcChannelQueuedLatencies(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + tracer.grpcChannelQueuedLatencies(stopwatch.elapsed(TimeUnit.NANOSECONDS)); } static class Factory extends ClientStreamTracer.Factory { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsAttributes.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsAttributes.java deleted file mode 100644 index e34659444b..0000000000 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsAttributes.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2023 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.stub.metrics; - -import io.opentelemetry.api.common.AttributeKey; - -class BuiltinMetricsAttributes { - - static final AttributeKey PROJECT_ID = AttributeKey.stringKey("project_id"); - static final AttributeKey INSTANCE_ID = AttributeKey.stringKey("instance"); - static final AttributeKey TABLE_ID = AttributeKey.stringKey("table"); - static final AttributeKey CLUSTER_ID = AttributeKey.stringKey("cluster"); - static final AttributeKey ZONE_ID = AttributeKey.stringKey("zone"); - - static final AttributeKey APP_PROFILE = AttributeKey.stringKey("app_profile"); - static final AttributeKey STREAMING = AttributeKey.booleanKey("streaming"); - static final AttributeKey METHOD = AttributeKey.stringKey("method"); - static final AttributeKey STATUS = AttributeKey.stringKey("status"); - static final AttributeKey CLIENT_NAME = AttributeKey.stringKey("client_name"); - static final AttributeKey CLIENT_UID = AttributeKey.stringKey("client_uid"); -} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java new file mode 100644 index 0000000000..807191b7e2 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java @@ -0,0 +1,177 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.InternalApi; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.View; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** Defining Bigtable builit-in metrics scope, attributes, metric names and views. */ +@InternalApi +public class BuiltinMetricsConstants { + + // Metric attribute keys for monitored resource + public static final AttributeKey PROJECT_ID_KEY = AttributeKey.stringKey("project_id"); + public static final AttributeKey INSTANCE_ID_KEY = AttributeKey.stringKey("instance"); + public static final AttributeKey TABLE_ID_KEY = AttributeKey.stringKey("table"); + public static final AttributeKey CLUSTER_ID_KEY = AttributeKey.stringKey("cluster"); + public static final AttributeKey ZONE_ID_KEY = AttributeKey.stringKey("zone"); + + // Metric attribute keys for labels + // We need to access APP_PROFILE_KEY in EnhancedBigtableStubSettings and STREAMING_KEY in + // IT tests, so they're public. + public static final AttributeKey APP_PROFILE_KEY = AttributeKey.stringKey("app_profile"); + public static final AttributeKey STREAMING_KEY = AttributeKey.booleanKey("streaming"); + static final AttributeKey METHOD_KEY = AttributeKey.stringKey("method"); + static final AttributeKey STATUS_KEY = AttributeKey.stringKey("status"); + static final AttributeKey CLIENT_NAME_KEY = AttributeKey.stringKey("client_name"); + static final AttributeKey CLIENT_UID_KEY = AttributeKey.stringKey("client_uid"); + + // Metric names + public static final String OPERATION_LATENCIES_NAME = "operation_latencies"; + static final String ATTEMPT_LATENCIES_NAME = "attempt_latencies"; + static final String RETRY_COUNT_NAME = "retry_count"; + static final String CONNECTIVITY_ERROR_COUNT_NAME = "connectivity_error_count"; + static final String SERVER_LATENCIES_NAME = "server_latencies"; + static final String FIRST_RESPONSE_LATENCIES_NAME = "first_response_latencies"; + static final String APPLICATION_BLOCKING_LATENCIES_NAME = "application_latencies"; + static final String CLIENT_BLOCKING_LATENCIES_NAME = "throttling_latencies"; + + // Buckets under 100,000 are identical to buckets for server side metrics handler_latencies. + // Extending client side bucket to up to 3,200,000. + private static final Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM = + Aggregation.explicitBucketHistogram( + ImmutableList.of( + 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0, 16.0, 20.0, 25.0, 30.0, 40.0, + 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0, + 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 200000.0, + 400000.0, 800000.0, 1600000.0, 3200000.0)); // max is 53.3 minutes + + public static final String METER_NAME = "bigtable.googleapis.com/internal/client/"; + + static final Set COMMON_ATTRIBUTES = + ImmutableSet.of( + PROJECT_ID_KEY, + INSTANCE_ID_KEY, + TABLE_ID_KEY, + APP_PROFILE_KEY, + CLUSTER_ID_KEY, + ZONE_ID_KEY, + METHOD_KEY, + CLIENT_NAME_KEY); + + static void defineView( + ImmutableMap.Builder viewMap, + String id, + Aggregation aggregation, + InstrumentType type, + String unit, + Set extraAttributes) { + InstrumentSelector selector = + InstrumentSelector.builder() + .setName(id) + .setMeterName(METER_NAME) + .setType(type) + .setUnit(unit) + .build(); + Set attributesFilter = + ImmutableSet.builder() + .addAll( + COMMON_ATTRIBUTES.stream().map(AttributeKey::getKey).collect(Collectors.toSet())) + .addAll(extraAttributes.stream().map(AttributeKey::getKey).collect(Collectors.toSet())) + .build(); + View view = + View.builder() + .setName(METER_NAME + id) + .setAggregation(aggregation) + .setAttributeFilter(attributesFilter) + .build(); + + viewMap.put(selector, view); + } + + public static Map getAllViews() { + ImmutableMap.Builder views = ImmutableMap.builder(); + + defineView( + views, + OPERATION_LATENCIES_NAME, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + InstrumentType.HISTOGRAM, + "ms", + ImmutableSet.of(STREAMING_KEY, STATUS_KEY)); + defineView( + views, + ATTEMPT_LATENCIES_NAME, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + InstrumentType.HISTOGRAM, + "ms", + ImmutableSet.of(STREAMING_KEY, STATUS_KEY)); + defineView( + views, + SERVER_LATENCIES_NAME, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + InstrumentType.HISTOGRAM, + "ms", + ImmutableSet.of(STATUS_KEY)); + defineView( + views, + FIRST_RESPONSE_LATENCIES_NAME, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + InstrumentType.HISTOGRAM, + "ms", + ImmutableSet.of(STATUS_KEY)); + defineView( + views, + APPLICATION_BLOCKING_LATENCIES_NAME, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + InstrumentType.HISTOGRAM, + "ms", + ImmutableSet.of()); + defineView( + views, + CLIENT_BLOCKING_LATENCIES_NAME, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + InstrumentType.HISTOGRAM, + "ms", + ImmutableSet.of()); + defineView( + views, + RETRY_COUNT_NAME, + Aggregation.sum(), + InstrumentType.COUNTER, + "1", + ImmutableSet.of(STATUS_KEY)); + defineView( + views, + CONNECTIVITY_ERROR_COUNT_NAME, + Aggregation.sum(), + InstrumentType.COUNTER, + "1", + ImmutableSet.of(STATUS_KEY)); + + return views.build(); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index 2d8262a93e..6bbed1363d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -16,13 +16,21 @@ package com.google.cloud.bigtable.data.v2.stub.metrics; import static com.google.api.gax.tracing.ApiTracerFactory.OperationType; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_NAME_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLUSTER_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METHOD_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.STATUS_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.STREAMING_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TABLE_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ZONE_ID_KEY; import com.google.api.gax.retrying.ServerStreamingAttemptException; import com.google.api.gax.tracing.SpanName; -import com.google.cloud.bigtable.stats.StatsRecorderWrapper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.math.IntMath; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,8 +45,7 @@ */ class BuiltinMetricsTracer extends BigtableTracer { - private final StatsRecorderWrapper recorder; - + private static final String NAME = "java-bigtable"; private final OperationType operationType; private final SpanName spanName; @@ -64,21 +71,56 @@ class BuiltinMetricsTracer extends BigtableTracer { private boolean flowControlIsDisabled = false; - private AtomicInteger requestLeft = new AtomicInteger(0); + private final AtomicInteger requestLeft = new AtomicInteger(0); // Monitored resource labels private String tableId = "unspecified"; private String zone = "global"; private String cluster = "unspecified"; - private AtomicLong totalClientBlockingTime = new AtomicLong(0); + private final AtomicLong totalClientBlockingTime = new AtomicLong(0); + + private final Attributes baseAttributes; + + private Long serverLatencies = null; + + // OpenCensus (and server) histogram buckets use [start, end), however OpenTelemetry uses (start, + // end]. To work around this, we measure all the latencies in nanoseconds and convert them + // to milliseconds and use DoubleHistogram. This should minimize the chance of a data + // point fall on the bucket boundary that causes off by one errors. + private final DoubleHistogram operationLatenciesHistogram; + private final DoubleHistogram attemptLatenciesHistogram; + private final DoubleHistogram serverLatenciesHistogram; + private final DoubleHistogram firstResponseLatenciesHistogram; + private final DoubleHistogram clientBlockingLatenciesHistogram; + private final DoubleHistogram applicationBlockingLatenciesHistogram; + private final LongCounter connectivityErrorCounter; + private final LongCounter retryCounter; - @VisibleForTesting BuiltinMetricsTracer( - OperationType operationType, SpanName spanName, StatsRecorderWrapper recorder) { + OperationType operationType, + SpanName spanName, + Attributes attributes, + DoubleHistogram operationLatenciesHistogram, + DoubleHistogram attemptLatenciesHistogram, + DoubleHistogram serverLatenciesHistogram, + DoubleHistogram firstResponseLatenciesHistogram, + DoubleHistogram clientBlockingLatenciesHistogram, + DoubleHistogram applicationBlockingLatenciesHistogram, + LongCounter connectivityErrorCounter, + LongCounter retryCounter) { this.operationType = operationType; this.spanName = spanName; - this.recorder = recorder; + this.baseAttributes = attributes; + + this.operationLatenciesHistogram = operationLatenciesHistogram; + this.attemptLatenciesHistogram = attemptLatenciesHistogram; + this.serverLatenciesHistogram = serverLatenciesHistogram; + this.firstResponseLatenciesHistogram = firstResponseLatenciesHistogram; + this.clientBlockingLatenciesHistogram = clientBlockingLatenciesHistogram; + this.applicationBlockingLatenciesHistogram = applicationBlockingLatenciesHistogram; + this.connectivityErrorCounter = connectivityErrorCounter; + this.retryCounter = retryCounter; } @Override @@ -203,13 +245,8 @@ public int getAttempt() { @Override public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) { - // Record the metrics and put in the map after the attempt is done, so we can have cluster and - // zone information if (latency != null) { - recorder.putGfeLatencies(latency); - recorder.putGfeMissingHeaders(0); - } else { - recorder.putGfeMissingHeaders(1); + serverLatencies = latency; } } @@ -220,13 +257,13 @@ public void setLocations(String zone, String cluster) { } @Override - public void batchRequestThrottled(long throttledTimeMs) { - totalClientBlockingTime.addAndGet(throttledTimeMs); + public void batchRequestThrottled(long throttledTimeNanos) { + totalClientBlockingTime.addAndGet(Duration.ofNanos(throttledTimeNanos).toMillis()); } @Override - public void grpcChannelQueuedLatencies(long queuedTimeMs) { - totalClientBlockingTime.addAndGet(queuedTimeMs); + public void grpcChannelQueuedLatencies(long queuedTimeNanos) { + totalClientBlockingTime.addAndGet(queuedTimeNanos); } @Override @@ -239,26 +276,43 @@ private void recordOperationCompletion(@Nullable Throwable status) { return; } operationTimer.stop(); - long operationLatency = operationTimer.elapsed(TimeUnit.MILLISECONDS); + + boolean isStreaming = operationType == OperationType.ServerStreaming; + String statusStr = Util.extractStatus(status); + + // Publish metric data with all the attributes. The attributes get filtered in + // BuiltinMetricsConstants when we construct the views. + Attributes attributes = + baseAttributes + .toBuilder() + .put(TABLE_ID_KEY, tableId) + .put(CLUSTER_ID_KEY, cluster) + .put(ZONE_ID_KEY, zone) + .put(METHOD_KEY, spanName.toString()) + .put(CLIENT_NAME_KEY, NAME) + .put(STREAMING_KEY, isStreaming) + .put(STATUS_KEY, statusStr) + .build(); + long operationLatencyNano = operationTimer.elapsed(TimeUnit.NANOSECONDS); // Only record when retry count is greater than 0 so the retry // graph will be less confusing if (attemptCount > 1) { - recorder.putRetryCount(attemptCount - 1); + retryCounter.add(attemptCount - 1, attributes); } + operationLatenciesHistogram.record(convertToMs(operationLatencyNano), attributes); + // serverLatencyTimer should already be stopped in recordAttemptCompletion - recorder.putOperationLatencies(operationLatency); - recorder.putApplicationLatencies( - Duration.ofNanos(operationLatencyNano - totalServerLatencyNano.get()).toMillis()); + long applicationLatencyNano = operationLatencyNano - totalServerLatencyNano.get(); + applicationBlockingLatenciesHistogram.record(convertToMs(applicationLatencyNano), attributes); if (operationType == OperationType.ServerStreaming && spanName.getMethodName().equals("ReadRows")) { - recorder.putFirstResponseLatencies(firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS)); + firstResponseLatenciesHistogram.record( + convertToMs(firstResponsePerOpTimer.elapsed(TimeUnit.NANOSECONDS)), attributes); } - - recorder.recordOperation(Util.extractStatus(status), tableId, zone, cluster); } private void recordAttemptCompletion(@Nullable Throwable status) { @@ -273,8 +327,7 @@ private void recordAttemptCompletion(@Nullable Throwable status) { } } - // Make sure to reset the blocking time after recording it for the next attempt - recorder.putClientBlockingLatencies(totalClientBlockingTime.getAndSet(0)); + boolean isStreaming = operationType == OperationType.ServerStreaming; // Patch the status until it's fixed in gax. When an attempt failed, // it'll throw a ServerStreamingAttemptException. Unwrap the exception @@ -283,7 +336,35 @@ private void recordAttemptCompletion(@Nullable Throwable status) { status = status.getCause(); } - recorder.putAttemptLatencies(attemptTimer.elapsed(TimeUnit.MILLISECONDS)); - recorder.recordAttempt(Util.extractStatus(status), tableId, zone, cluster); + String statusStr = Util.extractStatus(status); + + Attributes attributes = + baseAttributes + .toBuilder() + .put(TABLE_ID_KEY, tableId) + .put(CLUSTER_ID_KEY, cluster) + .put(ZONE_ID_KEY, zone) + .put(METHOD_KEY, spanName.toString()) + .put(CLIENT_NAME_KEY, NAME) + .put(STREAMING_KEY, isStreaming) + .put(STATUS_KEY, statusStr) + .build(); + + clientBlockingLatenciesHistogram.record(convertToMs(totalClientBlockingTime.get()), attributes); + + attemptLatenciesHistogram.record( + convertToMs(attemptTimer.elapsed(TimeUnit.NANOSECONDS)), attributes); + + if (serverLatencies != null) { + serverLatenciesHistogram.record(serverLatencies, attributes); + connectivityErrorCounter.add(0, attributes); + } else { + connectivityErrorCounter.add(1, attributes); + } + } + + private static double convertToMs(long nanoSeconds) { + double toMs = 1e-6; + return nanoSeconds * toMs; } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java index 794997071d..f0ac656978 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java @@ -15,29 +15,112 @@ */ package com.google.cloud.bigtable.data.v2.stub.metrics; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.APPLICATION_BLOCKING_LATENCIES_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ATTEMPT_LATENCIES_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_BLOCKING_LATENCIES_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CONNECTIVITY_ERROR_COUNT_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.FIRST_RESPONSE_LATENCIES_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METER_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.OPERATION_LATENCIES_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.RETRY_COUNT_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.SERVER_LATENCIES_NAME; + import com.google.api.core.InternalApi; import com.google.api.gax.tracing.ApiTracer; import com.google.api.gax.tracing.ApiTracerFactory; import com.google.api.gax.tracing.BaseApiTracerFactory; import com.google.api.gax.tracing.SpanName; -import com.google.cloud.bigtable.stats.StatsWrapper; -import com.google.common.collect.ImmutableMap; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import java.io.IOException; /** - * {@link ApiTracerFactory} that will generate OpenCensus metrics by using the {@link ApiTracer} + * {@link ApiTracerFactory} that will generate OpenTelemetry metrics by using the {@link ApiTracer} * api. */ @InternalApi("For internal use only") public class BuiltinMetricsTracerFactory extends BaseApiTracerFactory { - private final ImmutableMap statsAttributes; + private final Attributes attributes; + + private static final String MILLISECOND = "ms"; + private static final String COUNT = "1"; - public static BuiltinMetricsTracerFactory create(ImmutableMap statsAttributes) { - return new BuiltinMetricsTracerFactory(statsAttributes); + private final DoubleHistogram operationLatenciesHistogram; + private final DoubleHistogram attemptLatenciesHistogram; + private final DoubleHistogram serverLatenciesHistogram; + private final DoubleHistogram firstResponseLatenciesHistogram; + private final DoubleHistogram clientBlockingLatenciesHistogram; + private final DoubleHistogram applicationBlockingLatenciesHistogram; + private final LongCounter connectivityErrorCounter; + private final LongCounter retryCounter; + + public static BuiltinMetricsTracerFactory create( + OpenTelemetry openTelemetry, Attributes attributes) throws IOException { + return new BuiltinMetricsTracerFactory(openTelemetry, attributes); } - private BuiltinMetricsTracerFactory(ImmutableMap statsAttributes) { - this.statsAttributes = statsAttributes; + BuiltinMetricsTracerFactory(OpenTelemetry openTelemetry, Attributes attributes) { + this.attributes = attributes; + Meter meter = openTelemetry.getMeter(METER_NAME); + + operationLatenciesHistogram = + meter + .histogramBuilder(OPERATION_LATENCIES_NAME) + .setDescription( + "Total time until final operation success or failure, including retries and backoff.") + .setUnit(MILLISECOND) + .build(); + attemptLatenciesHistogram = + meter + .histogramBuilder(ATTEMPT_LATENCIES_NAME) + .setDescription("Client observed latency per RPC attempt.") + .setUnit(MILLISECOND) + .build(); + serverLatenciesHistogram = + meter + .histogramBuilder(SERVER_LATENCIES_NAME) + .setDescription( + "The latency measured from the moment that the RPC entered the Google data center until the RPC was completed.") + .setUnit(MILLISECOND) + .build(); + firstResponseLatenciesHistogram = + meter + .histogramBuilder(FIRST_RESPONSE_LATENCIES_NAME) + .setDescription( + "Latency from operation start until the response headers were received. The publishing of the measurement will be delayed until the attempt response has been received.") + .setUnit(MILLISECOND) + .build(); + clientBlockingLatenciesHistogram = + meter + .histogramBuilder(CLIENT_BLOCKING_LATENCIES_NAME) + .setDescription( + "The artificial latency introduced by the client to limit the number of outstanding requests. The publishing of the measurement will be delayed until the attempt trailers have been received.") + .setUnit(MILLISECOND) + .build(); + applicationBlockingLatenciesHistogram = + meter + .histogramBuilder(APPLICATION_BLOCKING_LATENCIES_NAME) + .setDescription( + "The latency of the client application consuming available response data.") + .setUnit(MILLISECOND) + .build(); + connectivityErrorCounter = + meter + .counterBuilder(CONNECTIVITY_ERROR_COUNT_NAME) + .setDescription( + "Number of requests that failed to reach the Google datacenter. (Requests without google response headers") + .setUnit(COUNT) + .build(); + retryCounter = + meter + .counterBuilder(RETRY_COUNT_NAME) + .setDescription("The number of additional RPCs sent after the initial attempt.") + .setUnit(COUNT) + .build(); } @Override @@ -45,6 +128,14 @@ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType op return new BuiltinMetricsTracer( operationType, spanName, - StatsWrapper.createRecorder(operationType, spanName, statsAttributes)); + attributes, + operationLatenciesHistogram, + attemptLatenciesHistogram, + serverLatenciesHistogram, + firstResponseLatenciesHistogram, + clientBlockingLatenciesHistogram, + applicationBlockingLatenciesHistogram, + connectivityErrorCounter, + retryCounter); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsView.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsView.java new file mode 100644 index 0000000000..8c7c552841 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsView.java @@ -0,0 +1,58 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.auth.Credentials; +import com.google.auth.oauth2.GoogleCredentials; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import java.io.IOException; +import java.util.Map; + +/** + * A util class to register built-in metrics on a custom OpenTelemetry instance. This is for + * advanced usage, and is only necessary when wanting to write built-in metrics to cloud monitoring + * and custom sinks. Please refer to {@link CustomOpenTelemetryMetricsProvider} for example usage. + */ +public class BuiltinMetricsView { + + private BuiltinMetricsView() {} + + /** + * Register built-in metrics on the {@link SdkMeterProviderBuilder} with application default + * credentials. + */ + public static void registerBuiltinMetrics(String projectId, SdkMeterProviderBuilder builder) + throws IOException { + BuiltinMetricsView.registerBuiltinMetrics( + projectId, GoogleCredentials.getApplicationDefault(), builder); + } + + /** Register built-in metrics on the {@link SdkMeterProviderBuilder} with credentials. */ + public static void registerBuiltinMetrics( + String projectId, Credentials credentials, SdkMeterProviderBuilder builder) + throws IOException { + MetricExporter metricExporter = BigtableCloudMonitoringExporter.create(projectId, credentials); + for (Map.Entry entry : + BuiltinMetricsConstants.getAllViews().entrySet()) { + builder.registerView(entry.getKey(), entry.getValue()); + } + builder.registerMetricReader(PeriodicMetricReader.create(metricExporter)); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CustomOpenTelemetryMetricsProvider.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CustomOpenTelemetryMetricsProvider.java new file mode 100644 index 0000000000..bcd1fe488f --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CustomOpenTelemetryMetricsProvider.java @@ -0,0 +1,69 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import io.opentelemetry.api.OpenTelemetry; + +/** + * Set a custom OpenTelemetry instance. + * + *

To register client side metrics on the custom OpenTelemetry: + * + *

{@code
+ * SdkMeterProviderBuilder sdkMeterProvider = SdkMeterProvider.builder();
+ *
+ * // register Builtin metrics on your meter provider
+ * BuiltinMetricsViews.registerBuiltinMetrics("project-id", sdkMeterProvider);
+ *
+ * // register other metrics reader and views
+ * sdkMeterProvider.registerMetricReader(..);
+ * sdkMeterProvider.registerView(..);
+ *
+ * // create the OTEL instance
+ * OpenTelemetry openTelemetry = OpenTelemetrySdk
+ *     .builder()
+ *     .setMeterProvider(sdkMeterProvider.build())
+ *     .build();
+ *
+ * // Override MetricsProvider in BigtableDataSettings
+ * BigtableDataSettings settings = BigtableDataSettings.newBuilder()
+ *   .setProjectId("my-project")
+ *   .setInstanceId("my-instance-id")
+ *   .setMetricsProvider(CustomOpenTelemetryMetricsProvider.create(openTelemetry)
+ *   .build();
+ * }
+ */ +public final class CustomOpenTelemetryMetricsProvider implements MetricsProvider { + + private final OpenTelemetry otel; + + public static CustomOpenTelemetryMetricsProvider create(OpenTelemetry otel) { + return new CustomOpenTelemetryMetricsProvider(otel); + } + + private CustomOpenTelemetryMetricsProvider(OpenTelemetry otel) { + this.otel = otel; + } + + public OpenTelemetry getOpenTelemetry() { + return otel; + } + + @Override + public String toString() { + return otel.toString(); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/DefaultMetricsProvider.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/DefaultMetricsProvider.java new file mode 100644 index 0000000000..0f3ee0c98f --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/DefaultMetricsProvider.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +/** + * Set {@link + * com.google.cloud.bigtable.data.v2.BigtableDataSettings.Builder#setMetricsProvider(MetricsProvider)}, + * to {@link this#INSTANCE} to enable collecting and export client side metrics + * https://cloud.google.com/bigtable/docs/client-side-metrics. This is the default setting in {@link + * com.google.cloud.bigtable.data.v2.BigtableDataSettings}. + */ +public final class DefaultMetricsProvider implements MetricsProvider { + + public static DefaultMetricsProvider INSTANCE = new DefaultMetricsProvider(); + + private DefaultMetricsProvider() {} + + @Override + public String toString() { + return "DefaultMetricsProvider"; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsProvider.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsProvider.java new file mode 100644 index 0000000000..251bb41619 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsProvider.java @@ -0,0 +1,25 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.InternalExtensionOnly; + +/** + * Provide client side metrics https://cloud.google.com/bigtable/docs/client-side-metrics + * implementations. + */ +@InternalExtensionOnly +public interface MetricsProvider {} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/NoopMetricsProvider.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/NoopMetricsProvider.java new file mode 100644 index 0000000000..7b5bcbc50a --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/NoopMetricsProvider.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +/** + * Set {@link + * com.google.cloud.bigtable.data.v2.BigtableDataSettings.Builder#setMetricsProvider(MetricsProvider)}, + * to {@link this#INSTANCE} to disable collecting and export client side metrics + * https://cloud.google.com/bigtable/docs/client-side-metrics. + */ +public final class NoopMetricsProvider implements MetricsProvider { + + public static NoopMetricsProvider INSTANCE = new NoopMetricsProvider(); + + private NoopMetricsProvider() {} + + @Override + public String toString() { + return "NoopMetricsProvider"; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java index b7140f0156..ce73d75dc1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java @@ -21,6 +21,7 @@ import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.tracing.ApiTracer; +import org.threeten.bp.Duration; /** * This callable will extract total throttled time from {@link ApiCallContext} and add it to {@link @@ -42,7 +43,8 @@ public ApiFuture futureCall(RequestT request, ApiCallContext context) // this should always be true if (tracer instanceof BigtableTracer) { ((BigtableTracer) tracer) - .batchRequestThrottled(context.getOption(Batcher.THROTTLED_TIME_KEY)); + .batchRequestThrottled( + Duration.ofMillis(context.getOption(Batcher.THROTTLED_TIME_KEY)).toNanos()); } } return innerCallable.futureCall(request, context); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index edcda45938..16f9b25c5f 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -36,6 +36,7 @@ import com.google.bigtable.v2.ReadRowsResponse; import com.google.cloud.bigtable.data.v2.internal.NameUtil; import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider; import com.google.common.base.Preconditions; import com.google.common.io.BaseEncoding; import io.grpc.Attributes; @@ -169,10 +170,13 @@ public void tearDown() { @Test public void testNewClientsShareTransportChannel() throws Exception { - // Create 3 lightweight clients - - try (BigtableDataClientFactory factory = BigtableDataClientFactory.create(defaultSettings); + try (BigtableDataClientFactory factory = + BigtableDataClientFactory.create( + defaultSettings + .toBuilder() + .setMetricsProvider(NoopMetricsProvider.INSTANCE) + .build()); BigtableDataClient ignored1 = factory.createForInstance("project1", "instance1"); BigtableDataClient ignored2 = factory.createForInstance("project2", "instance2"); BigtableDataClient ignored3 = factory.createForInstance("project3", "instance3")) { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/BuiltinMetricsIT.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/BuiltinMetricsIT.java index 4e75fb8631..484cbb26dc 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/BuiltinMetricsIT.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/BuiltinMetricsIT.java @@ -15,34 +15,64 @@ */ package com.google.cloud.bigtable.data.v2.it; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTestUtils.getAggregatedValue; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTestUtils.getMetricData; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTestUtils.getStartTimeSeconds; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTestUtils.verifyAttributes; +import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; import static com.google.common.truth.TruthJUnit.assume; import com.google.api.client.util.Lists; +import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient; import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; +import com.google.cloud.bigtable.admin.v2.models.AppProfile; +import com.google.cloud.bigtable.admin.v2.models.CreateAppProfileRequest; import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; import com.google.cloud.bigtable.admin.v2.models.Table; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants; +import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsView; +import com.google.cloud.bigtable.data.v2.stub.metrics.CustomOpenTelemetryMetricsProvider; import com.google.cloud.bigtable.test_helpers.env.EmulatorEnv; import com.google.cloud.bigtable.test_helpers.env.PrefixGenerator; import com.google.cloud.bigtable.test_helpers.env.TestEnvRule; import com.google.cloud.monitoring.v3.MetricServiceClient; import com.google.common.base.Stopwatch; +import com.google.common.collect.BoundType; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; import com.google.monitoring.v3.ListTimeSeriesRequest; import com.google.monitoring.v3.ListTimeSeriesResponse; +import com.google.monitoring.v3.Point; import com.google.monitoring.v3.ProjectName; import com.google.monitoring.v3.TimeInterval; +import com.google.monitoring.v3.TimeSeries; +import com.google.protobuf.Timestamp; import com.google.protobuf.util.Timestamps; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -50,6 +80,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.threeten.bp.Duration; +import org.threeten.bp.Instant; @RunWith(JUnit4.class) public class BuiltinMetricsIT { @@ -58,71 +89,131 @@ public class BuiltinMetricsIT { private static final Logger logger = Logger.getLogger(BuiltinMetricsIT.class.getName()); @Rule public Timeout globalTimeout = Timeout.seconds(900); - private static Table table; - private static BigtableTableAdminClient tableAdminClient; - private static MetricServiceClient metricClient; + + private Table tableCustomOtel; + private Table tableDefault; + private BigtableDataClient clientCustomOtel; + private BigtableDataClient clientDefault; + private BigtableTableAdminClient tableAdminClient; + private BigtableInstanceAdminClient instanceAdminClient; + private MetricServiceClient metricClient; + + private InMemoryMetricReader metricReader; + private String appProfileCustomOtel; + private String appProfileDefault; public static String[] VIEWS = { "operation_latencies", "attempt_latencies", "connectivity_error_count", - "application_blocking_latencies" + "application_blocking_latencies", }; - @BeforeClass - public static void setUpClass() throws IOException { + @Before + public void setup() throws IOException { + // This test tests 2 things. End-to-end test using the default OTEL instance created by the + // client, and also end-to-end test using a custom OTEL instance set by the customer. In + // both tests, a BigtableCloudMonitoringExporter is created to export data to Cloud Monitoring. assume() .withMessage("Builtin metrics integration test is not supported by emulator") .that(testEnvRule.env()) .isNotInstanceOf(EmulatorEnv.class); - // Enable built in metrics - BigtableDataSettings.enableBuiltinMetrics(); - // Create a cloud monitoring client metricClient = MetricServiceClient.create(); tableAdminClient = testEnvRule.env().getTableAdminClient(); + instanceAdminClient = testEnvRule.env().getInstanceAdminClient(); + appProfileCustomOtel = PrefixGenerator.newPrefix("test1"); + appProfileDefault = PrefixGenerator.newPrefix("test2"); + instanceAdminClient.createAppProfile( + CreateAppProfileRequest.of(testEnvRule.env().getInstanceId(), appProfileCustomOtel) + .setRoutingPolicy( + AppProfile.SingleClusterRoutingPolicy.of(testEnvRule.env().getPrimaryClusterId())) + .setIsolationPolicy(AppProfile.StandardIsolationPolicy.of(AppProfile.Priority.LOW))); + instanceAdminClient.createAppProfile( + CreateAppProfileRequest.of(testEnvRule.env().getInstanceId(), appProfileDefault) + .setRoutingPolicy( + AppProfile.SingleClusterRoutingPolicy.of(testEnvRule.env().getPrimaryClusterId())) + .setIsolationPolicy(AppProfile.StandardIsolationPolicy.of(AppProfile.Priority.LOW))); + + // When using the custom OTEL instance, we can also register a InMemoryMetricReader on the + // SdkMeterProvider to verify the data exported on Cloud Monitoring with the in memory metric + // data collected in InMemoryMetricReader. + metricReader = InMemoryMetricReader.create(); + + SdkMeterProviderBuilder meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader); + BuiltinMetricsView.registerBuiltinMetrics(testEnvRule.env().getProjectId(), meterProvider); + OpenTelemetry openTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider.build()).build(); + + BigtableDataSettings.Builder settings = testEnvRule.env().getDataClientSettings().toBuilder(); + + clientCustomOtel = + BigtableDataClient.create( + settings + .setMetricsProvider(CustomOpenTelemetryMetricsProvider.create(openTelemetry)) + .setAppProfileId(appProfileCustomOtel) + .build()); + clientDefault = BigtableDataClient.create(settings.setAppProfileId(appProfileDefault).build()); } - @AfterClass - public static void tearDown() { + @After + public void tearDown() { if (metricClient != null) { metricClient.close(); } - if (table != null) { - tableAdminClient.deleteTable(table.getId()); + if (tableCustomOtel != null) { + tableAdminClient.deleteTable(tableCustomOtel.getId()); + } + if (tableDefault != null) { + tableAdminClient.deleteTable(tableDefault.getId()); + } + if (instanceAdminClient != null) { + instanceAdminClient.deleteAppProfile( + testEnvRule.env().getInstanceId(), appProfileCustomOtel, true); + instanceAdminClient.deleteAppProfile( + testEnvRule.env().getInstanceId(), appProfileDefault, true); + } + if (clientCustomOtel != null) { + clientCustomOtel.close(); + } + if (clientDefault != null) { + clientDefault.close(); } } @Test - public void testBuiltinMetrics() throws Exception { - logger.info("Started testing builtin metrics"); - table = + public void testBuiltinMetricsWithDefaultOTEL() throws Exception { + logger.info("Started testing builtin metrics with default OTEL"); + tableDefault = tableAdminClient.createTable( - CreateTableRequest.of(PrefixGenerator.newPrefix("BuiltinMetricsIT#test")) + CreateTableRequest.of(PrefixGenerator.newPrefix("BuiltinMetricsIT#test1")) .addFamily("cf")); - logger.info("Create table: " + table.getId()); - // Send a MutateRow and ReadRows request - testEnvRule - .env() - .getDataClient() - .mutateRow(RowMutation.create(table.getId(), "a-new-key").setCell("cf", "q", "abc")); + logger.info("Create default table: " + tableDefault.getId()); + + Instant start = Instant.now().minus(Duration.ofSeconds(10)); + + // Send a MutateRow and ReadRows request and measure the latencies for these requests. + clientDefault.mutateRow( + RowMutation.create(tableDefault.getId(), "a-new-key").setCell("cf", "q", "abc")); ArrayList rows = - Lists.newArrayList( - testEnvRule.env().getDataClient().readRows(Query.create(table.getId()).limit(10))); + Lists.newArrayList(clientDefault.readRows(Query.create(tableDefault.getId()).limit(10))); - Stopwatch stopwatch = Stopwatch.createStarted(); + // This stopwatch is used for to limit fetching of metric data in verifyMetrics + Stopwatch metricsPollingStopwatch = Stopwatch.createStarted(); ProjectName name = ProjectName.of(testEnvRule.env().getProjectId()); - // Restrict time to last 10 minutes and 5 minutes after the request - long startMillis = System.currentTimeMillis() - Duration.ofMinutes(10).toMillis(); - long endMillis = startMillis + Duration.ofMinutes(15).toMillis(); + // Interval is set in the monarch request when query metric timestamps. + // Restrict it to before we send to request and 1 minute after we send the request. If + // it turns out to be still flaky we can increase the filter range. + Instant end = Instant.now().plus(Duration.ofMinutes(1)); TimeInterval interval = TimeInterval.newBuilder() - .setStartTime(Timestamps.fromMillis(startMillis)) - .setEndTime(Timestamps.fromMillis(endMillis)) + .setStartTime(Timestamps.fromMillis(start.toEpochMilli())) + .setEndTime(Timestamps.fromMillis(end.toEpochMilli())) .build(); for (String view : VIEWS) { @@ -132,42 +223,123 @@ public void testBuiltinMetrics() throws Exception { String.format( "metric.type=\"bigtable.googleapis.com/client/%s\" " + "AND resource.labels.instance=\"%s\" AND metric.labels.method=\"Bigtable.MutateRow\"" - + " AND resource.labels.table=\"%s\"", - view, testEnvRule.env().getInstanceId(), table.getId()); + + " AND resource.labels.table=\"%s\" AND metric.labels.app_profile=\"%s\"", + view, testEnvRule.env().getInstanceId(), tableDefault.getId(), appProfileDefault); ListTimeSeriesRequest.Builder requestBuilder = ListTimeSeriesRequest.newBuilder() .setName(name.toString()) .setFilter(metricFilter) .setInterval(interval) .setView(ListTimeSeriesRequest.TimeSeriesView.FULL); - - verifyMetricsArePublished(requestBuilder.build(), stopwatch, view); + verifyMetricsArePublished(requestBuilder.build(), metricsPollingStopwatch, view); // Verify that metrics are published for ReadRows request metricFilter = String.format( "metric.type=\"bigtable.googleapis.com/client/%s\" " + "AND resource.labels.instance=\"%s\" AND metric.labels.method=\"Bigtable.ReadRows\"" - + " AND resource.labels.table=\"%s\"", - view, testEnvRule.env().getInstanceId(), table.getId()); + + " AND resource.labels.table=\"%s\" AND metric.labels.app_profile=\"%s\"", + view, testEnvRule.env().getInstanceId(), tableDefault.getId(), appProfileDefault); + requestBuilder.setFilter(metricFilter); + + verifyMetricsArePublished(requestBuilder.build(), metricsPollingStopwatch, view); + } + } + + @Test + public void testBuiltinMetricsWithCustomOTEL() throws Exception { + logger.info("Started testing builtin metrics with custom OTEL"); + tableCustomOtel = + tableAdminClient.createTable( + CreateTableRequest.of(PrefixGenerator.newPrefix("BuiltinMetricsIT#test2")) + .addFamily("cf")); + logger.info("Create custom table: " + tableCustomOtel.getId()); + + Instant start = Instant.now().minus(Duration.ofSeconds(10)); + // Send a MutateRow and ReadRows request and measure the latencies for these requests. + clientCustomOtel.mutateRow( + RowMutation.create(tableCustomOtel.getId(), "a-new-key").setCell("cf", "q", "abc")); + ArrayList rows = + Lists.newArrayList( + clientCustomOtel.readRows(Query.create(tableCustomOtel.getId()).limit(10))); + + // This stopwatch is used for to limit fetching of metric data in verifyMetrics + Stopwatch metricsPollingStopwatch = Stopwatch.createStarted(); + + ProjectName name = ProjectName.of(testEnvRule.env().getProjectId()); + + Collection fromMetricReader = metricReader.collectAllMetrics(); + + // Interval is set in the monarch request when query metric timestamps. + // Restrict it to before we send to request and 1 minute after we send the request. If + // it turns out to be still flaky we can increase the filter range. + Instant end = start.plus(Duration.ofMinutes(1)); + TimeInterval interval = + TimeInterval.newBuilder() + .setStartTime(Timestamps.fromMillis(start.toEpochMilli())) + .setEndTime(Timestamps.fromMillis(end.toEpochMilli())) + .build(); + + for (String view : VIEWS) { + String otelMetricName = view; + if (view.equals("application_blocking_latencies")) { + otelMetricName = "application_latencies"; + } + MetricData dataFromReader = getMetricData(fromMetricReader, otelMetricName); + + // Filter on instance and method name + // Verify that metrics are correct for MutateRows request + String metricFilter = + String.format( + "metric.type=\"bigtable.googleapis.com/client/%s\" " + + "AND resource.labels.instance=\"%s\" AND metric.labels.method=\"Bigtable.MutateRow\"" + + " AND resource.labels.table=\"%s\" AND metric.labels.app_profile=\"%s\"", + view, + testEnvRule.env().getInstanceId(), + tableCustomOtel.getId(), + appProfileCustomOtel); + ListTimeSeriesRequest.Builder requestBuilder = + ListTimeSeriesRequest.newBuilder() + .setName(name.toString()) + .setFilter(metricFilter) + .setInterval(interval) + .setView(ListTimeSeriesRequest.TimeSeriesView.FULL); + + ListTimeSeriesResponse response = + verifyMetricsArePublished(requestBuilder.build(), metricsPollingStopwatch, view); + verifyMetricsWithMetricsReader(response, dataFromReader); + + // Verify that metrics are correct for ReadRows request + metricFilter = + String.format( + "metric.type=\"bigtable.googleapis.com/client/%s\" " + + "AND resource.labels.instance=\"%s\" AND metric.labels.method=\"Bigtable.ReadRows\"" + + " AND resource.labels.table=\"%s\" AND metric.labels.app_profile=\"%s\"", + view, + testEnvRule.env().getInstanceId(), + tableCustomOtel.getId(), + appProfileCustomOtel); requestBuilder.setFilter(metricFilter); - verifyMetricsArePublished(requestBuilder.build(), stopwatch, view); + response = verifyMetricsArePublished(requestBuilder.build(), metricsPollingStopwatch, view); + verifyMetricsWithMetricsReader(response, dataFromReader); } } - private void verifyMetricsArePublished( - ListTimeSeriesRequest request, Stopwatch stopwatch, String view) throws Exception { + private ListTimeSeriesResponse verifyMetricsArePublished( + ListTimeSeriesRequest request, Stopwatch metricsPollingStopwatch, String view) + throws Exception { ListTimeSeriesResponse response = metricClient.listTimeSeriesCallable().call(request); - logger.log( - Level.INFO, - "Checking for view " - + view - + ", has timeseries=" - + response.getTimeSeriesCount() - + " stopwatch elapsed " - + stopwatch.elapsed(TimeUnit.MINUTES)); - while (response.getTimeSeriesCount() == 0 && stopwatch.elapsed(TimeUnit.MINUTES) < 10) { + while (response.getTimeSeriesCount() == 0 + && metricsPollingStopwatch.elapsed(TimeUnit.MINUTES) < 10) { + logger.log( + Level.INFO, + "Checking for view " + + view + + ", has timeseries=" + + response.getTimeSeriesCount() + + " stopwatch elapsed " + + metricsPollingStopwatch.elapsed(TimeUnit.MINUTES)); // Call listTimeSeries every minute Thread.sleep(Duration.ofMinutes(1).toMillis()); response = metricClient.listTimeSeriesCallable().call(request); @@ -176,5 +348,64 @@ private void verifyMetricsArePublished( assertWithMessage("View " + view + " didn't return any data.") .that(response.getTimeSeriesCount()) .isGreaterThan(0); + + return response; + } + + private void verifyMetricsWithMetricsReader( + ListTimeSeriesResponse response, MetricData dataFromReader) { + for (TimeSeries ts : response.getTimeSeriesList()) { + Map attributesMap = + ImmutableMap.builder() + .putAll(ts.getResource().getLabelsMap()) + .putAll(ts.getMetric().getLabelsMap()) + .build(); + AttributesBuilder attributesBuilder = Attributes.builder(); + String streamingKey = BuiltinMetricsConstants.STREAMING_KEY.getKey(); + attributesMap.forEach( + (k, v) -> { + if (!k.equals(streamingKey)) { + attributesBuilder.put(k, v); + } + }); + if (attributesMap.containsKey(streamingKey)) { + attributesBuilder.put(streamingKey, Boolean.parseBoolean(attributesMap.get(streamingKey))); + } + Attributes attributes = attributesBuilder.build(); + verifyAttributes(dataFromReader, attributes); + long expectedValue = getAggregatedValue(dataFromReader, attributes); + Timestamp startTime = getStartTimeSeconds(dataFromReader, attributes); + assertThat(startTime.getSeconds()).isGreaterThan(0); + List point = + ts.getPointsList().stream() + .filter( + p -> + Timestamps.compare(p.getInterval().getStartTime(), startTime) >= 0 + && Timestamps.compare( + p.getInterval().getStartTime(), + Timestamps.add( + startTime, + com.google.protobuf.Duration.newBuilder() + .setSeconds(60) + .build())) + < 0) + .collect(Collectors.toList()); + if (point.size() > 0) { + long actualValue = (long) point.get(0).getValue().getDistributionValue().getMean(); + assertWithMessage( + "actual value does not match expected value, actual value " + + actualValue + + " expected value " + + expectedValue + + " actual start time " + + point.get(0).getInterval().getStartTime() + + " expected start time " + + startTime) + .that(actualValue) + .isIn( + Range.range( + expectedValue - 1, BoundType.CLOSED, expectedValue + 1, BoundType.CLOSED)); + } + } } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/StreamingMetricsMetadataIT.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/StreamingMetricsMetadataIT.java index b0e12d5ade..d898c882ac 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/StreamingMetricsMetadataIT.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/StreamingMetricsMetadataIT.java @@ -21,31 +21,66 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.rpc.NotFoundException; import com.google.cloud.bigtable.admin.v2.models.Cluster; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; -import com.google.cloud.bigtable.stats.BuiltinViews; -import com.google.cloud.bigtable.stats.StatsWrapper; +import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants; +import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsView; +import com.google.cloud.bigtable.data.v2.stub.metrics.CustomOpenTelemetryMetricsProvider; import com.google.cloud.bigtable.test_helpers.env.EmulatorEnv; import com.google.cloud.bigtable.test_helpers.env.TestEnvRule; import com.google.common.collect.Lists; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.junit.BeforeClass; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; public class StreamingMetricsMetadataIT { @ClassRule public static TestEnvRule testEnvRule = new TestEnvRule(); - @BeforeClass - public static void setUpClass() { + private BigtableDataClient client; + private InMemoryMetricReader metricReader; + + @Before + public void setup() throws IOException { assume() .withMessage("StreamingMetricsMetadataIT is not supported on Emulator") .that(testEnvRule.env()) .isNotInstanceOf(EmulatorEnv.class); - BuiltinViews.registerBigtableBuiltinViews(); + + BigtableDataSettings.Builder settings = testEnvRule.env().getDataClientSettings().toBuilder(); + + metricReader = InMemoryMetricReader.create(); + + SdkMeterProviderBuilder meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader); + BuiltinMetricsView.registerBuiltinMetrics(testEnvRule.env().getProjectId(), meterProvider); + OpenTelemetry openTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider.build()).build(); + + settings.setMetricsProvider(CustomOpenTelemetryMetricsProvider.create(openTelemetry)); + client = BigtableDataClient.create(settings.build()); + } + + @After + public void tearDown() throws IOException { + if (client != null) { + client.close(); + } } @Test @@ -54,7 +89,7 @@ public void testSuccess() throws Exception { String uniqueKey = prefix + "-read"; Query query = Query.create(testEnvRule.env().getTableId()).rowKey(uniqueKey); - ArrayList rows = Lists.newArrayList(testEnvRule.env().getDataClient().readRows(query)); + ArrayList rows = Lists.newArrayList(client.readRows(query)); ApiFuture> clustersFuture = testEnvRule @@ -64,27 +99,55 @@ public void testSuccess() throws Exception { List clusters = clustersFuture.get(1, TimeUnit.MINUTES); - // give opencensus some time to populate view data - Thread.sleep(100); + List metrics = + metricReader.collectAllMetrics().stream() + .filter(m -> m.getName().contains(BuiltinMetricsConstants.OPERATION_LATENCIES_NAME)) + .collect(Collectors.toList()); + + assertThat(metrics.size()).isEqualTo(1); - List tagValueStrings = StatsWrapper.getOperationLatencyViewTagValueStrings(); - assertThat(tagValueStrings).contains(clusters.get(0).getZone()); - assertThat(tagValueStrings).contains(clusters.get(0).getId()); + MetricData metricData = metrics.get(0); + List pointData = new ArrayList<>(metricData.getData().getPoints()); + List clusterAttributes = + pointData.stream() + .map(pd -> pd.getAttributes().get(BuiltinMetricsConstants.CLUSTER_ID_KEY)) + .collect(Collectors.toList()); + List zoneAttributes = + pointData.stream() + .map(pd -> pd.getAttributes().get(BuiltinMetricsConstants.ZONE_ID_KEY)) + .collect(Collectors.toList()); + + assertThat(clusterAttributes).contains(clusters.get(0).getId()); + assertThat(zoneAttributes).contains(clusters.get(0).getZone()); } @Test - public void testFailure() throws InterruptedException { + public void testFailure() { Query query = Query.create("non-exist-table"); try { - Lists.newArrayList(testEnvRule.env().getDataClient().readRows(query)); + Lists.newArrayList(client.readRows(query)); } catch (NotFoundException e) { } - // give opencensus some time to populate view data - Thread.sleep(100); + List metrics = + metricReader.collectAllMetrics().stream() + .filter(m -> m.getName().contains(BuiltinMetricsConstants.OPERATION_LATENCIES_NAME)) + .collect(Collectors.toList()); + + assertThat(metrics.size()).isEqualTo(1); + + MetricData metricData = metrics.get(0); + List pointData = new ArrayList<>(metricData.getData().getPoints()); + List clusterAttributes = + pointData.stream() + .map(pd -> pd.getAttributes().get(BuiltinMetricsConstants.CLUSTER_ID_KEY)) + .collect(Collectors.toList()); + List zoneAttributes = + pointData.stream() + .map(pd -> pd.getAttributes().get(BuiltinMetricsConstants.ZONE_ID_KEY)) + .collect(Collectors.toList()); - List tagValueStrings = StatsWrapper.getOperationLatencyViewTagValueStrings(); - assertThat(tagValueStrings).contains("unspecified"); - assertThat(tagValueStrings).contains("global"); + assertThat(clusterAttributes).contains("unspecified"); + assertThat(zoneAttributes).contains("global"); } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/UnaryMetricsMetadataIT.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/UnaryMetricsMetadataIT.java index aa2a4317fc..8efd2372b3 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/UnaryMetricsMetadataIT.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/UnaryMetricsMetadataIT.java @@ -21,29 +21,66 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.rpc.NotFoundException; import com.google.cloud.bigtable.admin.v2.models.Cluster; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.models.RowMutation; -import com.google.cloud.bigtable.stats.BuiltinViews; -import com.google.cloud.bigtable.stats.StatsWrapper; +import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants; +import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsView; +import com.google.cloud.bigtable.data.v2.stub.metrics.CustomOpenTelemetryMetricsProvider; import com.google.cloud.bigtable.test_helpers.env.EmulatorEnv; import com.google.cloud.bigtable.test_helpers.env.TestEnvRule; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.junit.BeforeClass; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; public class UnaryMetricsMetadataIT { @ClassRule public static TestEnvRule testEnvRule = new TestEnvRule(); - @BeforeClass - public static void setUpClass() { + private BigtableDataClient client; + private InMemoryMetricReader metricReader; + + @Before + public void setup() throws IOException { assume() .withMessage("UnaryMetricsMetadataIT is not supported on Emulator") .that(testEnvRule.env()) .isNotInstanceOf(EmulatorEnv.class); - BuiltinViews.registerBigtableBuiltinViews(); + + BigtableDataSettings.Builder settings = testEnvRule.env().getDataClientSettings().toBuilder(); + + metricReader = InMemoryMetricReader.create(); + + SdkMeterProviderBuilder meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader); + BuiltinMetricsView.registerBuiltinMetrics(testEnvRule.env().getProjectId(), meterProvider); + OpenTelemetry openTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider.build()).build(); + + settings.setMetricsProvider(CustomOpenTelemetryMetricsProvider.create(openTelemetry)); + + client = BigtableDataClient.create(settings.build()); + } + + @After + public void tearDown() throws IOException { + if (client != null) { + client.close(); + } } @Test @@ -52,9 +89,7 @@ public void testSuccess() throws Exception { String familyId = testEnvRule.env().getFamilyId(); ApiFuture future = - testEnvRule - .env() - .getDataClient() + client .mutateRowCallable() .futureCall( RowMutation.create(testEnvRule.env().getTableId(), rowKey) @@ -69,18 +104,26 @@ public void testSuccess() throws Exception { .listClustersAsync(testEnvRule.env().getInstanceId()); List clusters = clustersFuture.get(1, TimeUnit.MINUTES); - // give opencensus some time to populate view data - for (int i = 0; i < 10; i++) { - if (StatsWrapper.getOperationLatencyViewTagValueStrings() - .contains(clusters.get(0).getZone())) { - break; - } - Thread.sleep(100); - } + List metrics = + metricReader.collectAllMetrics().stream() + .filter(m -> m.getName().contains(BuiltinMetricsConstants.OPERATION_LATENCIES_NAME)) + .collect(Collectors.toList()); + + assertThat(metrics.size()).isEqualTo(1); - List tagValueStrings = StatsWrapper.getOperationLatencyViewTagValueStrings(); - assertThat(tagValueStrings).contains(clusters.get(0).getZone()); - assertThat(tagValueStrings).contains(clusters.get(0).getId()); + MetricData metricData = metrics.get(0); + List pointData = new ArrayList<>(metricData.getData().getPoints()); + List clusterAttributes = + pointData.stream() + .map(pd -> pd.getAttributes().get(BuiltinMetricsConstants.CLUSTER_ID_KEY)) + .collect(Collectors.toList()); + List zoneAttributes = + pointData.stream() + .map(pd -> pd.getAttributes().get(BuiltinMetricsConstants.ZONE_ID_KEY)) + .collect(Collectors.toList()); + + assertThat(clusterAttributes).contains(clusters.get(0).getId()); + assertThat(zoneAttributes).contains(clusters.get(0).getZone()); } @Test @@ -89,9 +132,7 @@ public void testFailure() throws Exception { String familyId = testEnvRule.env().getFamilyId(); ApiFuture future = - testEnvRule - .env() - .getDataClient() + client .mutateRowCallable() .futureCall( RowMutation.create("non-exist-table", rowKey).setCell(familyId, "q", "myVal")); @@ -106,16 +147,25 @@ public void testFailure() throws Exception { } } - // give opencensus some time to populate view data - for (int i = 0; i < 10; i++) { - if (StatsWrapper.getOperationLatencyViewTagValueStrings().contains("unspecified")) { - break; - } - Thread.sleep(100); - } + List metrics = + metricReader.collectAllMetrics().stream() + .filter(m -> m.getName().contains(BuiltinMetricsConstants.OPERATION_LATENCIES_NAME)) + .collect(Collectors.toList()); + + assertThat(metrics.size()).isEqualTo(1); + + MetricData metricData = metrics.get(0); + List pointData = new ArrayList<>(metricData.getData().getPoints()); + List clusterAttributes = + pointData.stream() + .map(pd -> pd.getAttributes().get(BuiltinMetricsConstants.CLUSTER_ID_KEY)) + .collect(Collectors.toList()); + List zoneAttributes = + pointData.stream() + .map(pd -> pd.getAttributes().get(BuiltinMetricsConstants.ZONE_ID_KEY)) + .collect(Collectors.toList()); - List tagValueStrings = StatsWrapper.getOperationLatencyViewTagValueStrings(); - assertThat(tagValueStrings).contains("unspecified"); - assertThat(tagValueStrings).contains("global"); + assertThat(clusterAttributes).contains("unspecified"); + assertThat(zoneAttributes).contains("global"); } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index a57d42f6f1..ca9ae7b00a 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -884,6 +884,7 @@ public void enableRetryInfoFalseValueTest() throws IOException { "generateInitialChangeStreamPartitionsSettings", "readChangeStreamSettings", "pingAndWarmSettings", + "metricsProvider", }; @Test diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporterTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporterTest.java index 0f3bed1d90..057ccdc353 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporterTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporterTest.java @@ -15,13 +15,13 @@ */ package com.google.cloud.bigtable.data.v2.stub.metrics; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.APP_PROFILE; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.CLIENT_UID; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.CLUSTER_ID; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.INSTANCE_ID; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.PROJECT_ID; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.TABLE_ID; -import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsAttributes.ZONE_ID; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.APP_PROFILE_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_UID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLUSTER_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.INSTANCE_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.PROJECT_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TABLE_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ZONE_ID_KEY; import static com.google.common.truth.Truth.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -93,17 +93,17 @@ public void setUp() { attributes = Attributes.builder() - .put(PROJECT_ID, projectId) - .put(INSTANCE_ID, instanceId) - .put(TABLE_ID, tableId) - .put(CLUSTER_ID, cluster) - .put(ZONE_ID, zone) - .put(APP_PROFILE, appProfileId) + .put(PROJECT_ID_KEY, projectId) + .put(INSTANCE_ID_KEY, instanceId) + .put(TABLE_ID_KEY, tableId) + .put(CLUSTER_ID_KEY, cluster) + .put(ZONE_ID_KEY, zone) + .put(APP_PROFILE_KEY, appProfileId) .build(); resource = Resource.create(Attributes.empty()); - scope = InstrumentationScopeInfo.create("bigtable.googleapis.com"); + scope = InstrumentationScopeInfo.create(BuiltinMetricsConstants.METER_NAME); } @After @@ -146,16 +146,17 @@ public void testExportingSumData() { assertThat(timeSeries.getResource().getLabelsMap()) .containsExactly( - PROJECT_ID.getKey(), projectId, - INSTANCE_ID.getKey(), instanceId, - TABLE_ID.getKey(), tableId, - CLUSTER_ID.getKey(), cluster, - ZONE_ID.getKey(), zone); + PROJECT_ID_KEY.getKey(), projectId, + INSTANCE_ID_KEY.getKey(), instanceId, + TABLE_ID_KEY.getKey(), tableId, + CLUSTER_ID_KEY.getKey(), cluster, + ZONE_ID_KEY.getKey(), zone); assertThat(timeSeries.getMetric().getLabelsMap()).hasSize(2); assertThat(timeSeries.getMetric().getLabelsMap()) - .containsAtLeast(APP_PROFILE.getKey(), appProfileId); - assertThat(timeSeries.getMetric().getLabelsMap()).containsAtLeast(CLIENT_UID.getKey(), taskId); + .containsAtLeast(APP_PROFILE_KEY.getKey(), appProfileId); + assertThat(timeSeries.getMetric().getLabelsMap()) + .containsAtLeast(CLIENT_UID_KEY.getKey(), taskId); assertThat(timeSeries.getPoints(0).getValue().getInt64Value()).isEqualTo(fakeValue); assertThat(timeSeries.getPoints(0).getInterval().getStartTime().getNanos()) .isEqualTo(startEpoch); @@ -207,16 +208,17 @@ public void testExportingHistogramData() { assertThat(timeSeries.getResource().getLabelsMap()) .containsExactly( - PROJECT_ID.getKey(), projectId, - INSTANCE_ID.getKey(), instanceId, - TABLE_ID.getKey(), tableId, - CLUSTER_ID.getKey(), cluster, - ZONE_ID.getKey(), zone); + PROJECT_ID_KEY.getKey(), projectId, + INSTANCE_ID_KEY.getKey(), instanceId, + TABLE_ID_KEY.getKey(), tableId, + CLUSTER_ID_KEY.getKey(), cluster, + ZONE_ID_KEY.getKey(), zone); assertThat(timeSeries.getMetric().getLabelsMap()).hasSize(2); assertThat(timeSeries.getMetric().getLabelsMap()) - .containsAtLeast(APP_PROFILE.getKey(), appProfileId); - assertThat(timeSeries.getMetric().getLabelsMap()).containsAtLeast(CLIENT_UID.getKey(), taskId); + .containsAtLeast(APP_PROFILE_KEY.getKey(), appProfileId); + assertThat(timeSeries.getMetric().getLabelsMap()) + .containsAtLeast(CLIENT_UID_KEY.getKey(), taskId); Distribution distribution = timeSeries.getPoints(0).getValue().getDistributionValue(); assertThat(distribution.getCount()).isEqualTo(3); assertThat(timeSeries.getPoints(0).getInterval().getStartTime().getNanos()) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTestUtils.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTestUtils.java new file mode 100644 index 0000000000..af402952da --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTestUtils.java @@ -0,0 +1,107 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.InternalApi; +import com.google.protobuf.Timestamp; +import com.google.protobuf.util.Timestamps; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +@InternalApi +public class BuiltinMetricsTestUtils { + + private BuiltinMetricsTestUtils() {} + + public static MetricData getMetricData(Collection allMetricData, String metricName) { + List metricDataList = + allMetricData.stream() + .filter(md -> md.getName().equals(BuiltinMetricsConstants.METER_NAME + metricName)) + .collect(Collectors.toList()); + if (metricDataList.size() == 0) { + allMetricData.stream().forEach(md -> System.out.println(md.getName())); + } + assertThat(metricDataList.size()).isEqualTo(1); + + return metricDataList.get(0); + } + + public static long getAggregatedValue(MetricData metricData, Attributes attributes) { + switch (metricData.getType()) { + case HISTOGRAM: + HistogramPointData hd = + metricData.getHistogramData().getPoints().stream() + .filter(pd -> pd.getAttributes().equals(attributes)) + .collect(Collectors.toList()) + .get(0); + return (long) hd.getSum() / hd.getCount(); + case LONG_SUM: + LongPointData ld = + metricData.getLongSumData().getPoints().stream() + .filter(pd -> pd.getAttributes().equals(attributes)) + .collect(Collectors.toList()) + .get(0); + return ld.getValue(); + } + return 0; + } + + public static Timestamp getStartTimeSeconds(MetricData metricData, Attributes attributes) { + switch (metricData.getType()) { + case HISTOGRAM: + HistogramPointData hd = + metricData.getHistogramData().getPoints().stream() + .filter(pd -> pd.getAttributes().equals(attributes)) + .collect(Collectors.toList()) + .get(0); + return Timestamps.fromNanos(hd.getStartEpochNanos()); + case LONG_SUM: + LongPointData ld = + metricData.getLongSumData().getPoints().stream() + .filter(pd -> pd.getAttributes().equals(attributes)) + .collect(Collectors.toList()) + .get(0); + return Timestamps.fromNanos(ld.getStartEpochNanos()); + } + return Timestamp.getDefaultInstance(); + } + + public static void verifyAttributes(MetricData metricData, Attributes attributes) { + switch (metricData.getType()) { + case HISTOGRAM: + List hd = + metricData.getHistogramData().getPoints().stream() + .filter(pd -> pd.getAttributes().equals(attributes)) + .collect(Collectors.toList()); + assertThat(hd.size()).isGreaterThan(0); + break; + case LONG_SUM: + List ld = + metricData.getLongSumData().getPoints().stream() + .filter(pd -> pd.getAttributes().equals(attributes)) + .collect(Collectors.toList()); + assertThat(ld.size()).isGreaterThan(0); + break; + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java index c2be1ea0ff..3a601e5036 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -15,13 +15,24 @@ */ package com.google.cloud.bigtable.data.v2.stub.metrics; -import static com.google.api.gax.tracing.ApiTracerFactory.OperationType; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.APPLICATION_BLOCKING_LATENCIES_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ATTEMPT_LATENCIES_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_BLOCKING_LATENCIES_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_NAME_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLUSTER_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CONNECTIVITY_ERROR_COUNT_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METHOD_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.OPERATION_LATENCIES_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.RETRY_COUNT_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.SERVER_LATENCIES_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.STATUS_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.STREAMING_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TABLE_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ZONE_ID_KEY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTestUtils.getAggregatedValue; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTestUtils.getMetricData; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTestUtils.verifyAttributes; import static com.google.common.truth.Truth.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import com.google.api.client.util.Lists; import com.google.api.core.ApiFunction; @@ -34,7 +45,6 @@ import com.google.api.gax.rpc.NotFoundException; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StreamController; -import com.google.api.gax.tracing.SpanName; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.MutateRowRequest; import com.google.bigtable.v2.MutateRowResponse; @@ -51,7 +61,6 @@ import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; -import com.google.cloud.bigtable.stats.StatsRecorderWrapper; import com.google.common.base.Stopwatch; import com.google.common.collect.Range; import com.google.protobuf.ByteString; @@ -74,10 +83,20 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -88,12 +107,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -import org.mockito.stubbing.Answer; import org.threeten.bp.Duration; @RunWith(JUnit4.class) @@ -101,7 +116,7 @@ public class BuiltinMetricsTracerTest { private static final String PROJECT_ID = "fake-project"; private static final String INSTANCE_ID = "fake-instance"; private static final String APP_PROFILE_ID = "default"; - private static final String TABLE_ID = "fake-table"; + private static final String TABLE = "fake-table"; private static final String BAD_TABLE_ID = "non-exist-table"; private static final String ZONE = "us-west-1"; @@ -120,18 +135,35 @@ public class BuiltinMetricsTracerTest { private EnhancedBigtableStub stub; - @Mock private BuiltinMetricsTracerFactory mockFactory; - @Mock private StatsRecorderWrapper statsRecorderWrapper; + private int batchElementCount = 2; - @Captor private ArgumentCaptor status; - @Captor private ArgumentCaptor tableId; - @Captor private ArgumentCaptor zone; - @Captor private ArgumentCaptor cluster; + private Attributes baseAttributes; - private int batchElementCount = 2; + private InMemoryMetricReader metricReader; @Before public void setUp() throws Exception { + metricReader = InMemoryMetricReader.create(); + + baseAttributes = + Attributes.builder() + .put(BuiltinMetricsConstants.PROJECT_ID_KEY, PROJECT_ID) + .put(BuiltinMetricsConstants.INSTANCE_ID_KEY, INSTANCE_ID) + .put(BuiltinMetricsConstants.APP_PROFILE_KEY, APP_PROFILE_ID) + .build(); + + SdkMeterProviderBuilder meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader); + + for (Map.Entry entry : + BuiltinMetricsConstants.getAllViews().entrySet()) { + meterProvider.registerView(entry.getKey(), entry.getValue()); + } + + OpenTelemetrySdk otel = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider.build()).build(); + BuiltinMetricsTracerFactory facotry = BuiltinMetricsTracerFactory.create(otel, baseAttributes); + // Add an interceptor to add server-timing in headers ServerInterceptor trailersInterceptor = new ServerInterceptor() { @@ -212,7 +244,8 @@ public void sendMessage(ReqT message) { .setMaxOutstandingRequestBytes(1001L) .build()) .build()); - stubSettingsBuilder.setTracerFactory(mockFactory); + + stubSettingsBuilder.setTracerFactory(facotry); InstantiatingGrpcChannelProvider.Builder channelProvider = ((InstantiatingGrpcChannelProvider) stubSettingsBuilder.getTransportChannelProvider()) @@ -243,86 +276,90 @@ public void tearDown() { @Test public void testReadRowsOperationLatencies() { - when(mockFactory.newTracer(any(), any(), any())) - .thenAnswer( - (Answer) - invocationOnMock -> - new BuiltinMetricsTracer( - OperationType.ServerStreaming, - SpanName.of("Bigtable", "ReadRows"), - statsRecorderWrapper)); - ArgumentCaptor operationLatency = ArgumentCaptor.forClass(Long.class); - Stopwatch stopwatch = Stopwatch.createStarted(); - Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)).iterator()); + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE)).iterator()); long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); - verify(statsRecorderWrapper).putOperationLatencies(operationLatency.capture()); - // verify record operation is only called once - verify(statsRecorderWrapper) - .recordOperation(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); + Attributes expectedAttributes = + baseAttributes + .toBuilder() + .put(STATUS_KEY, "OK") + .put(TABLE_ID_KEY, TABLE) + .put(ZONE_ID_KEY, ZONE) + .put(CLUSTER_ID_KEY, CLUSTER) + .put(METHOD_KEY, "Bigtable.ReadRows") + .put(STREAMING_KEY, true) + .put(CLIENT_NAME_KEY, "java-bigtable") + .build(); + + Collection allMetricData = metricReader.collectAllMetrics(); + + MetricData metricData = getMetricData(allMetricData, OPERATION_LATENCIES_NAME); - assertThat(operationLatency.getValue()).isIn(Range.closed(SERVER_LATENCY, elapsed)); - assertThat(status.getAllValues()).containsExactly("OK"); - assertThat(tableId.getAllValues()).containsExactly(TABLE_ID); - assertThat(zone.getAllValues()).containsExactly(ZONE); - assertThat(cluster.getAllValues()).containsExactly(CLUSTER); + long value = getAggregatedValue(metricData, expectedAttributes); + assertThat(value).isIn(Range.closed(SERVER_LATENCY, elapsed)); } @Test public void testGfeMetrics() { - when(mockFactory.newTracer(any(), any(), any())) - .thenAnswer( - (Answer) - invocationOnMock -> - new BuiltinMetricsTracer( - OperationType.ServerStreaming, - SpanName.of("Bigtable", "ReadRows"), - statsRecorderWrapper)); - ArgumentCaptor gfeLatency = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor gfeMissingHeaders = ArgumentCaptor.forClass(Long.class); - - Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); - - // Verify record attempt are called multiple times - verify(statsRecorderWrapper, times(fakeService.getAttemptCounter().get())) - .recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); - - // The request was retried and gfe latency is only recorded in the retry attempt - verify(statsRecorderWrapper).putGfeLatencies(gfeLatency.capture()); - assertThat(gfeLatency.getValue()).isEqualTo(FAKE_SERVER_TIMING); - - // The first time the request was retried, it'll increment missing header counter - verify(statsRecorderWrapper, times(fakeService.getAttemptCounter().get())) - .putGfeMissingHeaders(gfeMissingHeaders.capture()); - assertThat(gfeMissingHeaders.getAllValues()).containsExactly(1L, 0L); - - assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "OK"); - assertThat(tableId.getAllValues()).containsExactly(TABLE_ID, TABLE_ID); - assertThat(zone.getAllValues()).containsExactly("global", ZONE); - assertThat(cluster.getAllValues()).containsExactly("unspecified", CLUSTER); + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE))); + + Attributes expectedAttributes = + baseAttributes + .toBuilder() + .put(STATUS_KEY, "OK") + .put(TABLE_ID_KEY, TABLE) + .put(ZONE_ID_KEY, ZONE) + .put(CLUSTER_ID_KEY, CLUSTER) + .put(CLIENT_NAME_KEY, "java-bigtable") + .put(METHOD_KEY, "Bigtable.ReadRows") + .build(); + + Collection allMetricData = metricReader.collectAllMetrics(); + + MetricData serverLatenciesMetricData = getMetricData(allMetricData, SERVER_LATENCIES_NAME); + + long serverLatencies = getAggregatedValue(serverLatenciesMetricData, expectedAttributes); + assertThat(serverLatencies).isEqualTo(FAKE_SERVER_TIMING); + + MetricData connectivityErrorCountMetricData = + getMetricData(allMetricData, CONNECTIVITY_ERROR_COUNT_NAME); + Attributes expected1 = + baseAttributes + .toBuilder() + .put(STATUS_KEY, "UNAVAILABLE") + .put(TABLE_ID_KEY, TABLE) + .put(ZONE_ID_KEY, "global") + .put(CLUSTER_ID_KEY, "unspecified") + .put(METHOD_KEY, "Bigtable.ReadRows") + .put(CLIENT_NAME_KEY, "java-bigtable") + .build(); + Attributes expected2 = + baseAttributes + .toBuilder() + .put(STATUS_KEY, "OK") + .put(TABLE_ID_KEY, TABLE) + .put(ZONE_ID_KEY, ZONE) + .put(CLUSTER_ID_KEY, CLUSTER) + .put(METHOD_KEY, "Bigtable.ReadRows") + .put(CLIENT_NAME_KEY, "java-bigtable") + .build(); + + verifyAttributes(connectivityErrorCountMetricData, expected1); + verifyAttributes(connectivityErrorCountMetricData, expected2); + + assertThat(getAggregatedValue(connectivityErrorCountMetricData, expected1)).isEqualTo(1); + assertThat(getAggregatedValue(connectivityErrorCountMetricData, expected2)).isEqualTo(0); } @Test public void testReadRowsApplicationLatencyWithAutoFlowControl() throws Exception { - when(mockFactory.newTracer(any(), any(), any())) - .thenAnswer( - (Answer) - invocationOnMock -> - new BuiltinMetricsTracer( - OperationType.ServerStreaming, - SpanName.of("Bigtable", "ReadRows"), - statsRecorderWrapper)); - - ArgumentCaptor applicationLatency = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor operationLatency = ArgumentCaptor.forClass(Long.class); - final SettableApiFuture future = SettableApiFuture.create(); final AtomicInteger counter = new AtomicInteger(0); // For auto flow control, application latency is the time application spent in onResponse. stub.readRowsCallable() .call( - Query.create(TABLE_ID), + Query.create(TABLE), new ResponseObserver() { @Override public void onStart(StreamController streamController) {} @@ -348,37 +385,38 @@ public void onComplete() { }); future.get(); - verify(statsRecorderWrapper).putApplicationLatencies(applicationLatency.capture()); - verify(statsRecorderWrapper).putOperationLatencies(operationLatency.capture()); - verify(statsRecorderWrapper) - .recordOperation(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); - assertThat(counter.get()).isEqualTo(fakeService.getResponseCounter().get()); - // Thread.sleep might not sleep for the requested amount depending on the interrupt period - // defined by the OS. - // On linux this is ~1ms but on windows may be as high as 15-20ms. - assertThat(applicationLatency.getValue()) - .isAtLeast((APPLICATION_LATENCY - SLEEP_VARIABILITY) * counter.get()); - assertThat(applicationLatency.getValue()) - .isAtMost(operationLatency.getValue() - SERVER_LATENCY); + + Collection allMetricData = metricReader.collectAllMetrics(); + MetricData applicationLatency = + getMetricData(allMetricData, APPLICATION_BLOCKING_LATENCIES_NAME); + + Attributes expectedAttributes = + baseAttributes + .toBuilder() + .put(TABLE_ID_KEY, TABLE) + .put(ZONE_ID_KEY, ZONE) + .put(CLUSTER_ID_KEY, CLUSTER) + .put(CLIENT_NAME_KEY, "java-bigtable") + .put(METHOD_KEY, "Bigtable.ReadRows") + .build(); + long value = getAggregatedValue(applicationLatency, expectedAttributes); + + assertThat(value).isAtLeast((APPLICATION_LATENCY - SLEEP_VARIABILITY) * counter.get()); + + MetricData operationLatency = getMetricData(allMetricData, OPERATION_LATENCIES_NAME); + long operationLatencyValue = + getAggregatedValue( + operationLatency, + expectedAttributes.toBuilder().put(STATUS_KEY, "OK").put(STREAMING_KEY, true).build()); + assertThat(value).isAtMost(operationLatencyValue - SERVER_LATENCY); } @Test public void testReadRowsApplicationLatencyWithManualFlowControl() throws Exception { - when(mockFactory.newTracer(any(), any(), any())) - .thenAnswer( - (Answer) - invocationOnMock -> - new BuiltinMetricsTracer( - OperationType.ServerStreaming, - SpanName.of("Bigtable", "ReadRows"), - statsRecorderWrapper)); - - ArgumentCaptor applicationLatency = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor operationLatency = ArgumentCaptor.forClass(Long.class); int counter = 0; - Iterator rows = stub.readRowsCallable().call(Query.create(TABLE_ID)).iterator(); + Iterator rows = stub.readRowsCallable().call(Query.create(TABLE)).iterator(); while (rows.hasNext()) { counter++; @@ -386,99 +424,130 @@ public void testReadRowsApplicationLatencyWithManualFlowControl() throws Excepti rows.next(); } - verify(statsRecorderWrapper).putApplicationLatencies(applicationLatency.capture()); - verify(statsRecorderWrapper).putOperationLatencies(operationLatency.capture()); - verify(statsRecorderWrapper) - .recordOperation(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); + Collection allMetricData = metricReader.collectAllMetrics(); + MetricData applicationLatency = + getMetricData(allMetricData, APPLICATION_BLOCKING_LATENCIES_NAME); + + Attributes expectedAttributes = + baseAttributes + .toBuilder() + .put(TABLE_ID_KEY, TABLE) + .put(ZONE_ID_KEY, ZONE) + .put(CLUSTER_ID_KEY, CLUSTER) + .put(CLIENT_NAME_KEY, "java-bigtable") + .put(METHOD_KEY, "Bigtable.ReadRows") + .build(); - // For manual flow control, the last application latency shouldn't count, because at that point - // the server already sent back all the responses. + long value = getAggregatedValue(applicationLatency, expectedAttributes); + // For manual flow control, the last application latency shouldn't count, because at that + // point the server already sent back all the responses. assertThat(counter).isEqualTo(fakeService.getResponseCounter().get()); - assertThat(applicationLatency.getValue()) - .isAtLeast(APPLICATION_LATENCY * (counter - 1) - SERVER_LATENCY); - assertThat(applicationLatency.getValue()) - .isAtMost(operationLatency.getValue() - SERVER_LATENCY); + assertThat(value).isAtLeast(APPLICATION_LATENCY * (counter - 1) - SERVER_LATENCY); + + MetricData operationLatency = getMetricData(allMetricData, OPERATION_LATENCIES_NAME); + long operationLatencyValue = + getAggregatedValue( + operationLatency, + expectedAttributes.toBuilder().put(STATUS_KEY, "OK").put(STREAMING_KEY, true).build()); + assertThat(value).isAtMost(operationLatencyValue - SERVER_LATENCY); } @Test - public void testRetryCount() { - when(mockFactory.newTracer(any(), any(), any())) - .thenAnswer( - (Answer) - invocationOnMock -> - new BuiltinMetricsTracer( - OperationType.ServerStreaming, - SpanName.of("Bigtable", "MutateRow"), - statsRecorderWrapper)); - - ArgumentCaptor retryCount = ArgumentCaptor.forClass(Integer.class); - + public void testRetryCount() throws InterruptedException { stub.mutateRowCallable() - .call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value")); - - // In TracedUnaryCallable, we create a future and add a TraceFinisher to the callback. Main - // thread is blocked on waiting for the future to be completed. When onComplete is called on - // the grpc thread, the future is completed, however we might not have enough time for - // TraceFinisher to run. Add a 1 second time out to wait for the callback. This shouldn't have - // any impact on production code. - verify(statsRecorderWrapper, timeout(1000)).putRetryCount(retryCount.capture()); + .call(RowMutation.create(TABLE, "random-row").setCell("cf", "q", "value")); + + Collection allMetricData = metricReader.collectAllMetrics(); + MetricData metricData = getMetricData(allMetricData, RETRY_COUNT_NAME); + Attributes expectedAttributes = + baseAttributes + .toBuilder() + .put(TABLE_ID_KEY, TABLE) + .put(ZONE_ID_KEY, ZONE) + .put(CLUSTER_ID_KEY, CLUSTER) + .put(CLIENT_NAME_KEY, "java-bigtable") + .put(METHOD_KEY, "Bigtable.MutateRow") + .put(STATUS_KEY, "OK") + .build(); - assertThat(retryCount.getValue()).isEqualTo(fakeService.getAttemptCounter().get() - 1); + long value = getAggregatedValue(metricData, expectedAttributes); + assertThat(value).isEqualTo(fakeService.getAttemptCounter().get() - 1); } @Test public void testMutateRowAttemptsTagValues() { - when(mockFactory.newTracer(any(), any(), any())) - .thenReturn( - new BuiltinMetricsTracer( - OperationType.Unary, SpanName.of("Bigtable", "MutateRow"), statsRecorderWrapper)); - stub.mutateRowCallable() - .call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value")); - - // Set a timeout to reduce flakiness of this test. BasicRetryingFuture will set - // attempt succeeded and set the response which will call complete() in AbstractFuture which - // calls releaseWaiters(). onOperationComplete() is called in TracerFinisher which will be - // called after the mutateRow call is returned. So there's a race between when the call returns - // and when the record() is called in onOperationCompletion(). - verify(statsRecorderWrapper, timeout(50).times(fakeService.getAttemptCounter().get())) - .recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); - assertThat(zone.getAllValues()).containsExactly("global", "global", ZONE); - assertThat(cluster.getAllValues()).containsExactly("unspecified", "unspecified", CLUSTER); - assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "UNAVAILABLE", "OK"); - assertThat(tableId.getAllValues()).containsExactly(TABLE_ID, TABLE_ID, TABLE_ID); + .call(RowMutation.create(TABLE, "random-row").setCell("cf", "q", "value")); + + Collection allMetricData = metricReader.collectAllMetrics(); + MetricData metricData = getMetricData(allMetricData, ATTEMPT_LATENCIES_NAME); + + Attributes expected1 = + baseAttributes + .toBuilder() + .put(STATUS_KEY, "UNAVAILABLE") + .put(TABLE_ID_KEY, TABLE) + .put(ZONE_ID_KEY, "global") + .put(CLUSTER_ID_KEY, "unspecified") + .put(METHOD_KEY, "Bigtable.MutateRow") + .put(CLIENT_NAME_KEY, "java-bigtable") + .put(STREAMING_KEY, false) + .build(); + + Attributes expected2 = + baseAttributes + .toBuilder() + .put(STATUS_KEY, "OK") + .put(TABLE_ID_KEY, TABLE) + .put(ZONE_ID_KEY, ZONE) + .put(CLUSTER_ID_KEY, CLUSTER) + .put(METHOD_KEY, "Bigtable.MutateRow") + .put(CLIENT_NAME_KEY, "java-bigtable") + .put(STREAMING_KEY, false) + .build(); + + verifyAttributes(metricData, expected1); + verifyAttributes(metricData, expected2); } @Test public void testReadRowsAttemptsTagValues() { - when(mockFactory.newTracer(any(), any(), any())) - .thenReturn( - new BuiltinMetricsTracer( - OperationType.ServerStreaming, - SpanName.of("Bigtable", "ReadRows"), - statsRecorderWrapper)); - Lists.newArrayList(stub.readRowsCallable().call(Query.create("fake-table")).iterator()); - // Set a timeout to reduce flakiness of this test. BasicRetryingFuture will set - // attempt succeeded and set the response which will call complete() in AbstractFuture which - // calls releaseWaiters(). onOperationComplete() is called in TracerFinisher which will be - // called after the mutateRow call is returned. So there's a race between when the call returns - // and when the record() is called in onOperationCompletion(). - verify(statsRecorderWrapper, timeout(50).times(fakeService.getAttemptCounter().get())) - .recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); - assertThat(zone.getAllValues()).containsExactly("global", ZONE); - assertThat(cluster.getAllValues()).containsExactly("unspecified", CLUSTER); - assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "OK"); + Collection allMetricData = metricReader.collectAllMetrics(); + MetricData metricData = getMetricData(allMetricData, ATTEMPT_LATENCIES_NAME); + + Attributes expected1 = + baseAttributes + .toBuilder() + .put(STATUS_KEY, "UNAVAILABLE") + .put(TABLE_ID_KEY, TABLE) + .put(ZONE_ID_KEY, "global") + .put(CLUSTER_ID_KEY, "unspecified") + .put(METHOD_KEY, "Bigtable.ReadRows") + .put(CLIENT_NAME_KEY, "java-bigtable") + .put(STREAMING_KEY, true) + .build(); + + Attributes expected2 = + baseAttributes + .toBuilder() + .put(STATUS_KEY, "OK") + .put(TABLE_ID_KEY, TABLE) + .put(ZONE_ID_KEY, ZONE) + .put(CLUSTER_ID_KEY, CLUSTER) + .put(METHOD_KEY, "Bigtable.ReadRows") + .put(CLIENT_NAME_KEY, "java-bigtable") + .put(STREAMING_KEY, true) + .build(); + + verifyAttributes(metricData, expected1); + verifyAttributes(metricData, expected2); } @Test public void testBatchBlockingLatencies() throws InterruptedException { - when(mockFactory.newTracer(any(), any(), any())) - .thenReturn( - new BuiltinMetricsTracer( - OperationType.Unary, SpanName.of("Bigtable", "MutateRows"), statsRecorderWrapper)); - try (Batcher batcher = stub.newMutateRowsBatcher(TABLE_ID, null)) { + try (Batcher batcher = stub.newMutateRowsBatcher(TABLE, null)) { for (int i = 0; i < 6; i++) { batcher.add(RowMutationEntry.create("key").setCell("f", "q", "v")); } @@ -487,86 +556,100 @@ public void testBatchBlockingLatencies() throws InterruptedException { batcher.close(); int expectedNumRequests = 6 / batchElementCount; - ArgumentCaptor throttledTime = ArgumentCaptor.forClass(Long.class); - verify(statsRecorderWrapper, timeout(1000).times(expectedNumRequests)) - .putClientBlockingLatencies(throttledTime.capture()); - // After the first request is sent, batcher will block on add because of the server latency. - // Blocking latency should be around server latency. - assertThat(throttledTime.getAllValues().get(1)).isAtLeast(SERVER_LATENCY - 10); - assertThat(throttledTime.getAllValues().get(2)).isAtLeast(SERVER_LATENCY - 10); + Collection allMetricData = metricReader.collectAllMetrics(); + MetricData applicationLatency = getMetricData(allMetricData, CLIENT_BLOCKING_LATENCIES_NAME); - verify(statsRecorderWrapper, timeout(100).times(expectedNumRequests)) - .recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); + Attributes expectedAttributes = + baseAttributes + .toBuilder() + .put(TABLE_ID_KEY, TABLE) + .put(ZONE_ID_KEY, ZONE) + .put(CLUSTER_ID_KEY, CLUSTER) + .put(METHOD_KEY, "Bigtable.MutateRows") + .put(CLIENT_NAME_KEY, "java-bigtable") + .build(); - assertThat(zone.getAllValues()).containsExactly(ZONE, ZONE, ZONE); - assertThat(cluster.getAllValues()).containsExactly(CLUSTER, CLUSTER, CLUSTER); + long value = getAggregatedValue(applicationLatency, expectedAttributes); + // After the first request is sent, batcher will block on add because of the server latency. + // Blocking latency should be around server latency. So each data point would be at least + // (SERVER_LATENCY - 10). + long expected = (SERVER_LATENCY - 10) * (expectedNumRequests - 1) / expectedNumRequests; + assertThat(value).isAtLeast(expected); } } @Test - public void testQueuedOnChannelServerStreamLatencies() throws InterruptedException { - when(mockFactory.newTracer(any(), any(), any())) - .thenReturn( - new BuiltinMetricsTracer( - OperationType.ServerStreaming, - SpanName.of("Bigtable", "ReadRows"), - statsRecorderWrapper)); - - stub.readRowsCallable().all().call(Query.create(TABLE_ID)); - - ArgumentCaptor blockedTime = ArgumentCaptor.forClass(Long.class); - - verify(statsRecorderWrapper, timeout(1000).times(fakeService.attemptCounter.get())) - .putClientBlockingLatencies(blockedTime.capture()); + public void testQueuedOnChannelServerStreamLatencies() { + stub.readRowsCallable().all().call(Query.create(TABLE)); + + Collection allMetricData = metricReader.collectAllMetrics(); + MetricData clientLatency = getMetricData(allMetricData, CLIENT_BLOCKING_LATENCIES_NAME); + + Attributes attributes = + baseAttributes + .toBuilder() + .put(TABLE_ID_KEY, TABLE) + .put(CLUSTER_ID_KEY, CLUSTER) + .put(ZONE_ID_KEY, ZONE) + .put(METHOD_KEY, "Bigtable.ReadRows") + .put(CLIENT_NAME_KEY, "java-bigtable") + .build(); - assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY); + long value = getAggregatedValue(clientLatency, attributes); + assertThat(value).isAtLeast(CHANNEL_BLOCKING_LATENCY); } @Test - public void testQueuedOnChannelUnaryLatencies() throws InterruptedException { - when(mockFactory.newTracer(any(), any(), any())) - .thenReturn( - new BuiltinMetricsTracer( - OperationType.Unary, SpanName.of("Bigtable", "MutateRow"), statsRecorderWrapper)); - stub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "a-key").setCell("f", "q", "v")); + public void testQueuedOnChannelUnaryLatencies() { + + stub.mutateRowCallable().call(RowMutation.create(TABLE, "a-key").setCell("f", "q", "v")); - ArgumentCaptor blockedTime = ArgumentCaptor.forClass(Long.class); + Collection allMetricData = metricReader.collectAllMetrics(); + MetricData clientLatency = getMetricData(allMetricData, CLIENT_BLOCKING_LATENCIES_NAME); - verify(statsRecorderWrapper, timeout(1000).times(fakeService.attemptCounter.get())) - .putClientBlockingLatencies(blockedTime.capture()); + Attributes attributes = + baseAttributes + .toBuilder() + .put(TABLE_ID_KEY, TABLE) + .put(CLUSTER_ID_KEY, CLUSTER) + .put(ZONE_ID_KEY, ZONE) + .put(METHOD_KEY, "Bigtable.MutateRow") + .put(CLIENT_NAME_KEY, "java-bigtable") + .build(); - assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY); - assertThat(blockedTime.getAllValues().get(2)).isAtLeast(CHANNEL_BLOCKING_LATENCY); + long expected = CHANNEL_BLOCKING_LATENCY * 2 / 3; + long actual = getAggregatedValue(clientLatency, attributes); + assertThat(actual).isAtLeast(expected); } @Test public void testPermanentFailure() { - when(mockFactory.newTracer(any(), any(), any())) - .thenReturn( - new BuiltinMetricsTracer( - OperationType.ServerStreaming, - SpanName.of("Bigtable", "ReadRows"), - statsRecorderWrapper)); - try { Lists.newArrayList(stub.readRowsCallable().call(Query.create(BAD_TABLE_ID)).iterator()); Assert.fail("Request should throw not found error"); } catch (NotFoundException e) { } - ArgumentCaptor attemptLatency = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor operationLatency = ArgumentCaptor.forClass(Long.class); + Collection allMetricData = metricReader.collectAllMetrics(); + MetricData attemptLatency = getMetricData(allMetricData, ATTEMPT_LATENCIES_NAME); + + Attributes expected = + baseAttributes + .toBuilder() + .put(STATUS_KEY, "NOT_FOUND") + .put(TABLE_ID_KEY, BAD_TABLE_ID) + .put(CLUSTER_ID_KEY, "unspecified") + .put(ZONE_ID_KEY, "global") + .put(STREAMING_KEY, true) + .put(METHOD_KEY, "Bigtable.ReadRows") + .put(CLIENT_NAME_KEY, "java-bigtable") + .build(); - verify(statsRecorderWrapper, timeout(50)).putAttemptLatencies(attemptLatency.capture()); - verify(statsRecorderWrapper, timeout(50)).putOperationLatencies(operationLatency.capture()); - verify(statsRecorderWrapper, timeout(50)) - .recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); + verifyAttributes(attemptLatency, expected); - assertThat(status.getValue()).isEqualTo("NOT_FOUND"); - assertThat(tableId.getValue()).isEqualTo(BAD_TABLE_ID); - assertThat(cluster.getValue()).isEqualTo("unspecified"); - assertThat(zone.getValue()).isEqualTo("global"); + MetricData opLatency = getMetricData(allMetricData, OPERATION_LATENCIES_NAME); + verifyAttributes(opLatency, expected); } private static class FakeService extends BigtableGrpc.BigtableImplBase {