COMMON_ATTRIBUTES =
+ ImmutableSet.of(
+ PROJECT_ID_KEY,
+ INSTANCE_ID_KEY,
+ LOCATION_ID_KEY,
+ INSTANCE_CONFIG_ID_KEY,
+ CLIENT_UID_KEY,
+ METHOD_KEY,
+ STATUS_KEY,
+ DATABASE_KEY,
+ CLIENT_NAME_KEY,
+ DIRECT_PATH_ENABLED_KEY,
+ DIRECT_PATH_USED_KEY);
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java
new file mode 100644
index 00000000000..51dc890902c
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import static com.google.cloud.spanner.BuiltInMetricsConstant.SPANNER_METRICS;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.rpc.PermissionDeniedException;
+import com.google.auth.Credentials;
+import com.google.cloud.monitoring.v3.MetricServiceClient;
+import com.google.cloud.monitoring.v3.MetricServiceSettings;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.monitoring.v3.CreateTimeSeriesRequest;
+import com.google.monitoring.v3.ProjectName;
+import com.google.monitoring.v3.TimeSeries;
+import com.google.protobuf.Empty;
+import io.opentelemetry.sdk.common.CompletableResultCode;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.threeten.bp.Duration;
+
+/**
+ * Spanner Cloud Monitoring OpenTelemetry Exporter.
+ *
+ * The exporter will look for all spanner owned metrics under spanner.googleapis.com
+ * instrumentation scope and upload it via the Google Cloud Monitoring API.
+ */
+class SpannerCloudMonitoringExporter implements MetricExporter {
+
+ private static final Logger logger =
+ Logger.getLogger(SpannerCloudMonitoringExporter.class.getName());
+
+ // This system property can be used to override the monitoring endpoint
+ // to a different environment. It's meant for internal testing only.
+ private static final String MONITORING_ENDPOINT =
+ MoreObjects.firstNonNull(
+ System.getProperty("spanner.test-monitoring-endpoint"),
+ MetricServiceSettings.getDefaultEndpoint());
+
+ // This the quota limit from Cloud Monitoring. More details in
+ // https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
+ private static final int EXPORT_BATCH_SIZE_LIMIT = 200;
+ private final AtomicBoolean spannerExportFailureLogged = new AtomicBoolean(false);
+ private CompletableResultCode lastExportCode;
+ private final MetricServiceClient client;
+ private final String spannerProjectId;
+
+ static SpannerCloudMonitoringExporter create(String projectId, @Nullable Credentials credentials)
+ throws IOException {
+ MetricServiceSettings.Builder settingsBuilder = MetricServiceSettings.newBuilder();
+ CredentialsProvider credentialsProvider;
+ if (credentials == null) {
+ credentialsProvider = NoCredentialsProvider.create();
+ } else {
+ credentialsProvider = FixedCredentialsProvider.create(credentials);
+ }
+ settingsBuilder.setCredentialsProvider(credentialsProvider);
+ settingsBuilder.setEndpoint(MONITORING_ENDPOINT);
+
+ org.threeten.bp.Duration timeout = Duration.ofMinutes(1);
+ // TODO: createServiceTimeSeries needs special handling if the request failed. Leaving
+ // it as not retried for now.
+ settingsBuilder.createServiceTimeSeriesSettings().setSimpleTimeoutNoRetries(timeout);
+
+ return new SpannerCloudMonitoringExporter(
+ projectId, MetricServiceClient.create(settingsBuilder.build()));
+ }
+
+ @VisibleForTesting
+ SpannerCloudMonitoringExporter(String projectId, MetricServiceClient client) {
+ this.client = client;
+ this.spannerProjectId = projectId;
+ }
+
+ @Override
+ public CompletableResultCode export(Collection collection) {
+ if (client.isShutdown()) {
+ logger.log(Level.WARNING, "Exporter is shut down");
+ return CompletableResultCode.ofFailure();
+ }
+
+ this.lastExportCode = exportSpannerClientMetrics(collection);
+ return lastExportCode;
+ }
+
+ /** Export client built in metrics */
+ private CompletableResultCode exportSpannerClientMetrics(Collection collection) {
+ // Filter spanner metrics
+ List spannerMetricData =
+ collection.stream()
+ .filter(md -> SPANNER_METRICS.contains(md.getName()))
+ .collect(Collectors.toList());
+
+ // Skips exporting if there's none
+ if (spannerMetricData.isEmpty()) {
+ return CompletableResultCode.ofSuccess();
+ }
+
+ // Verifies metrics project id is the same as the spanner project id set on this client
+ if (!spannerMetricData.stream()
+ .flatMap(metricData -> metricData.getData().getPoints().stream())
+ .allMatch(
+ pd -> spannerProjectId.equals(SpannerCloudMonitoringExporterUtils.getProjectId(pd)))) {
+ logger.log(Level.WARNING, "Metric data has a different projectId. Skipping export.");
+ return CompletableResultCode.ofFailure();
+ }
+
+ List spannerTimeSeries;
+ try {
+ spannerTimeSeries =
+ SpannerCloudMonitoringExporterUtils.convertToSpannerTimeSeries(spannerMetricData);
+ } catch (Throwable e) {
+ logger.log(
+ Level.WARNING,
+ "Failed to convert spanner metric data to cloud monitoring timeseries.",
+ e);
+ return CompletableResultCode.ofFailure();
+ }
+
+ ProjectName projectName = ProjectName.of(spannerProjectId);
+
+ ApiFuture> futureList = exportTimeSeriesInBatch(projectName, spannerTimeSeries);
+
+ CompletableResultCode spannerExportCode = new CompletableResultCode();
+ ApiFutures.addCallback(
+ futureList,
+ new ApiFutureCallback>() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ if (spannerExportFailureLogged.compareAndSet(false, true)) {
+ String msg = "createServiceTimeSeries request failed for spanner metrics.";
+ if (throwable instanceof PermissionDeniedException) {
+ // TODO: Add the link of public documentation when available in the log message.
+ msg +=
+ String.format(
+ " Need monitoring metric writer permission on project=%s.",
+ projectName.getProject());
+ }
+ logger.log(Level.WARNING, msg, throwable);
+ }
+ spannerExportCode.fail();
+ }
+
+ @Override
+ public void onSuccess(List empty) {
+ // When an export succeeded reset the export failure flag to false so if there's a
+ // transient failure it'll be logged.
+ spannerExportFailureLogged.set(false);
+ spannerExportCode.succeed();
+ }
+ },
+ MoreExecutors.directExecutor());
+
+ return spannerExportCode;
+ }
+
+ private ApiFuture> exportTimeSeriesInBatch(
+ ProjectName projectName, List timeSeries) {
+ List> batchResults = new ArrayList<>();
+
+ for (List batch : Iterables.partition(timeSeries, EXPORT_BATCH_SIZE_LIMIT)) {
+ CreateTimeSeriesRequest req =
+ CreateTimeSeriesRequest.newBuilder()
+ .setName(projectName.toString())
+ .addAllTimeSeries(batch)
+ .build();
+ batchResults.add(this.client.createServiceTimeSeriesCallable().futureCall(req));
+ }
+
+ return ApiFutures.allAsList(batchResults);
+ }
+
+ @Override
+ public CompletableResultCode flush() {
+ return CompletableResultCode.ofSuccess();
+ }
+
+ @Override
+ public CompletableResultCode shutdown() {
+ if (client.isShutdown()) {
+ logger.log(Level.WARNING, "shutdown is called multiple times");
+ return CompletableResultCode.ofSuccess();
+ }
+ CompletableResultCode shutdownResult = new CompletableResultCode();
+ try {
+ client.shutdown();
+ shutdownResult.succeed();
+ } catch (Throwable e) {
+ logger.log(Level.WARNING, "failed to shutdown the monitoring client", e);
+ shutdownResult.fail();
+ }
+ return shutdownResult;
+ }
+
+ /**
+ * For Google Cloud Monitoring always return CUMULATIVE to keep track of the cumulative value of a
+ * metric over time.
+ */
+ @Override
+ public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
+ return AggregationTemporality.CUMULATIVE;
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterUtils.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterUtils.java
new file mode 100644
index 00000000000..a6d1e29d587
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterUtils.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import static com.google.api.MetricDescriptor.MetricKind.CUMULATIVE;
+import static com.google.api.MetricDescriptor.MetricKind.GAUGE;
+import static com.google.api.MetricDescriptor.MetricKind.UNRECOGNIZED;
+import static com.google.api.MetricDescriptor.ValueType.DISTRIBUTION;
+import static com.google.api.MetricDescriptor.ValueType.DOUBLE;
+import static com.google.api.MetricDescriptor.ValueType.INT64;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.GAX_METER_NAME;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.SPANNER_PROMOTED_RESOURCE_LABELS;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.SPANNER_RESOURCE_TYPE;
+
+import com.google.api.Distribution;
+import com.google.api.Distribution.BucketOptions;
+import com.google.api.Distribution.BucketOptions.Explicit;
+import com.google.api.Metric;
+import com.google.api.MetricDescriptor.MetricKind;
+import com.google.api.MetricDescriptor.ValueType;
+import com.google.api.MonitoredResource;
+import com.google.monitoring.v3.Point;
+import com.google.monitoring.v3.TimeInterval;
+import com.google.monitoring.v3.TimeSeries;
+import com.google.monitoring.v3.TypedValue;
+import com.google.protobuf.util.Timestamps;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.data.DoublePointData;
+import io.opentelemetry.sdk.metrics.data.HistogramData;
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import io.opentelemetry.sdk.metrics.data.SumData;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+class SpannerCloudMonitoringExporterUtils {
+
+ private static final Logger logger =
+ Logger.getLogger(SpannerCloudMonitoringExporterUtils.class.getName());
+
+ private SpannerCloudMonitoringExporterUtils() {}
+
+ static String getProjectId(PointData pointData) {
+ return pointData.getAttributes().get(PROJECT_ID_KEY);
+ }
+
+ static List convertToSpannerTimeSeries(List collection) {
+ List allTimeSeries = new ArrayList<>();
+
+ for (MetricData metricData : collection) {
+ // Get common metrics data from GAX library
+ if (!metricData.getInstrumentationScopeInfo().getName().equals(GAX_METER_NAME)) {
+ // Filter out metric data for instruments that are not part of the spanner metrics list
+ continue;
+ }
+ metricData.getData().getPoints().stream()
+ .map(pointData -> convertPointToSpannerTimeSeries(metricData, pointData))
+ .forEach(allTimeSeries::add);
+ }
+
+ return allTimeSeries;
+ }
+
+ private static TimeSeries convertPointToSpannerTimeSeries(
+ MetricData metricData, PointData pointData) {
+ TimeSeries.Builder builder =
+ TimeSeries.newBuilder()
+ .setMetricKind(convertMetricKind(metricData))
+ .setValueType(convertValueType(metricData.getType()));
+ Metric.Builder metricBuilder = Metric.newBuilder().setType(metricData.getName());
+
+ Attributes attributes = pointData.getAttributes();
+ MonitoredResource.Builder monitoredResourceBuilder =
+ MonitoredResource.newBuilder().setType(SPANNER_RESOURCE_TYPE);
+
+ for (AttributeKey> key : attributes.asMap().keySet()) {
+ if (SPANNER_PROMOTED_RESOURCE_LABELS.contains(key)) {
+ monitoredResourceBuilder.putLabels(key.getKey(), String.valueOf(attributes.get(key)));
+ } else {
+ metricBuilder.putLabels(key.getKey(), String.valueOf(attributes.get(key)));
+ }
+ }
+
+ builder.setResource(monitoredResourceBuilder.build());
+ builder.setMetric(metricBuilder.build());
+
+ TimeInterval timeInterval =
+ TimeInterval.newBuilder()
+ .setStartTime(Timestamps.fromNanos(pointData.getStartEpochNanos()))
+ .setEndTime(Timestamps.fromNanos(pointData.getEpochNanos()))
+ .build();
+
+ builder.addPoints(createPoint(metricData.getType(), pointData, timeInterval));
+
+ return builder.build();
+ }
+
+ private static MetricKind convertMetricKind(MetricData metricData) {
+ switch (metricData.getType()) {
+ case HISTOGRAM:
+ case EXPONENTIAL_HISTOGRAM:
+ return convertHistogramType(metricData.getHistogramData());
+ case LONG_GAUGE:
+ case DOUBLE_GAUGE:
+ return GAUGE;
+ case LONG_SUM:
+ return convertSumDataType(metricData.getLongSumData());
+ case DOUBLE_SUM:
+ return convertSumDataType(metricData.getDoubleSumData());
+ default:
+ return UNRECOGNIZED;
+ }
+ }
+
+ private static MetricKind convertHistogramType(HistogramData histogramData) {
+ if (histogramData.getAggregationTemporality() == AggregationTemporality.CUMULATIVE) {
+ return CUMULATIVE;
+ }
+ return UNRECOGNIZED;
+ }
+
+ private static MetricKind convertSumDataType(SumData> sum) {
+ if (!sum.isMonotonic()) {
+ return GAUGE;
+ }
+ if (sum.getAggregationTemporality() == AggregationTemporality.CUMULATIVE) {
+ return CUMULATIVE;
+ }
+ return UNRECOGNIZED;
+ }
+
+ private static ValueType convertValueType(MetricDataType metricDataType) {
+ switch (metricDataType) {
+ case LONG_GAUGE:
+ case LONG_SUM:
+ return INT64;
+ case DOUBLE_GAUGE:
+ case DOUBLE_SUM:
+ return DOUBLE;
+ case HISTOGRAM:
+ case EXPONENTIAL_HISTOGRAM:
+ return DISTRIBUTION;
+ default:
+ return ValueType.UNRECOGNIZED;
+ }
+ }
+
+ private static Point createPoint(
+ MetricDataType type, PointData pointData, TimeInterval timeInterval) {
+ Point.Builder builder = Point.newBuilder().setInterval(timeInterval);
+ switch (type) {
+ case HISTOGRAM:
+ case EXPONENTIAL_HISTOGRAM:
+ return builder
+ .setValue(
+ TypedValue.newBuilder()
+ .setDistributionValue(convertHistogramData((HistogramPointData) pointData))
+ .build())
+ .build();
+ case DOUBLE_GAUGE:
+ case DOUBLE_SUM:
+ return builder
+ .setValue(
+ TypedValue.newBuilder()
+ .setDoubleValue(((DoublePointData) pointData).getValue())
+ .build())
+ .build();
+ case LONG_GAUGE:
+ case LONG_SUM:
+ return builder
+ .setValue(TypedValue.newBuilder().setInt64Value(((LongPointData) pointData).getValue()))
+ .build();
+ default:
+ logger.log(Level.WARNING, "unsupported metric type");
+ return builder.build();
+ }
+ }
+
+ private static Distribution convertHistogramData(HistogramPointData pointData) {
+ return Distribution.newBuilder()
+ .setCount(pointData.getCount())
+ .setMean(pointData.getCount() == 0L ? 0.0D : pointData.getSum() / pointData.getCount())
+ .setBucketOptions(
+ BucketOptions.newBuilder()
+ .setExplicitBuckets(Explicit.newBuilder().addAllBounds(pointData.getBoundaries())))
+ .addAllBucketCounts(pointData.getCounts())
+ .build();
+ }
+}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java
new file mode 100644
index 00000000000..378db5b6b0d
--- /dev/null
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java
@@ -0,0 +1,342 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_NAME_KEY;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_UID_KEY;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.DATABASE_KEY;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.DIRECT_PATH_USED_KEY;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.GAX_METER_NAME;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.INSTANCE_ID_KEY;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.LOCATION_ID_KEY;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.OPERATION_COUNT_NAME;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.OPERATION_LATENCIES_NAME;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY;
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.Distribution;
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.monitoring.v3.MetricServiceClient;
+import com.google.cloud.monitoring.v3.stub.MetricServiceStub;
+import com.google.common.collect.ImmutableList;
+import com.google.monitoring.v3.CreateTimeSeriesRequest;
+import com.google.monitoring.v3.TimeSeries;
+import com.google.protobuf.Empty;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
+import io.opentelemetry.sdk.resources.Resource;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+public class SpannerCloudMonitoringExporterTest {
+
+ private static final String projectId = "fake-project";
+ private static final String instanceId = "fake-instance";
+ private static final String locationId = "global";
+ private static final String databaseId = "fake-database";
+ private static final String clientName = "spanner-java";
+ private static final String instanceConfigId = "fake-instance-config-id";
+
+ @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Mock private MetricServiceStub mockMetricServiceStub;
+ private MetricServiceClient fakeMetricServiceClient;
+ private SpannerCloudMonitoringExporter exporter;
+
+ private Attributes attributes;
+ private Resource resource;
+ private InstrumentationScopeInfo scope;
+
+ @Before
+ public void setUp() {
+ fakeMetricServiceClient = new FakeMetricServiceClient(mockMetricServiceStub);
+ exporter = new SpannerCloudMonitoringExporter(projectId, fakeMetricServiceClient);
+
+ attributes =
+ Attributes.builder()
+ .put(PROJECT_ID_KEY, projectId)
+ .put(INSTANCE_ID_KEY, instanceId)
+ .put(LOCATION_ID_KEY, locationId)
+ .put(INSTANCE_CONFIG_ID_KEY, instanceConfigId)
+ .put(DATABASE_KEY, databaseId)
+ .put(CLIENT_NAME_KEY, clientName)
+ .put(String.valueOf(DIRECT_PATH_ENABLED_KEY), true)
+ .put(String.valueOf(DIRECT_PATH_USED_KEY), true)
+ .build();
+
+ resource = Resource.create(Attributes.empty());
+
+ scope = InstrumentationScopeInfo.create(GAX_METER_NAME);
+ }
+
+ @After
+ public void tearDown() {}
+
+ @Test
+ public void testExportingSumData() {
+ ArgumentCaptor argumentCaptor =
+ ArgumentCaptor.forClass(CreateTimeSeriesRequest.class);
+
+ UnaryCallable mockCallable = Mockito.mock(UnaryCallable.class);
+ Mockito.when(mockMetricServiceStub.createServiceTimeSeriesCallable()).thenReturn(mockCallable);
+ ApiFuture future = ApiFutures.immediateFuture(Empty.getDefaultInstance());
+ Mockito.when(mockCallable.futureCall(argumentCaptor.capture())).thenReturn(future);
+
+ long fakeValue = 11L;
+
+ long startEpoch = 10;
+ long endEpoch = 15;
+ LongPointData longPointData =
+ ImmutableLongPointData.create(startEpoch, endEpoch, attributes, fakeValue);
+
+ MetricData longData =
+ ImmutableMetricData.createLongSum(
+ resource,
+ scope,
+ "spanner.googleapis.com/internal/client/" + OPERATION_COUNT_NAME,
+ "description",
+ "1",
+ ImmutableSumData.create(
+ true, AggregationTemporality.CUMULATIVE, ImmutableList.of(longPointData)));
+
+ exporter.export(Arrays.asList(longData));
+
+ CreateTimeSeriesRequest request = argumentCaptor.getValue();
+
+ assertThat(request.getTimeSeriesList()).hasSize(1);
+
+ TimeSeries timeSeries = request.getTimeSeriesList().get(0);
+
+ assertThat(timeSeries.getResource().getLabelsMap())
+ .containsExactly(
+ PROJECT_ID_KEY.getKey(), projectId,
+ INSTANCE_ID_KEY.getKey(), instanceId,
+ LOCATION_ID_KEY.getKey(), locationId,
+ INSTANCE_CONFIG_ID_KEY.getKey(), instanceConfigId);
+
+ assertThat(timeSeries.getResource().getLabelsMap()).hasSize(4);
+
+ assertThat(timeSeries.getMetric().getLabelsMap())
+ .containsExactly(
+ DATABASE_KEY.getKey(),
+ databaseId,
+ CLIENT_NAME_KEY.getKey(),
+ clientName,
+ DIRECT_PATH_ENABLED_KEY.getKey(),
+ "true",
+ DIRECT_PATH_USED_KEY.getKey(),
+ "true");
+ assertThat(timeSeries.getMetric().getLabelsMap()).hasSize(4);
+
+ assertThat(timeSeries.getPoints(0).getValue().getInt64Value()).isEqualTo(fakeValue);
+ assertThat(timeSeries.getPoints(0).getInterval().getStartTime().getNanos())
+ .isEqualTo(startEpoch);
+ assertThat(timeSeries.getPoints(0).getInterval().getEndTime().getNanos()).isEqualTo(endEpoch);
+ }
+
+ @Test
+ public void testExportingHistogramData() {
+ ArgumentCaptor argumentCaptor =
+ ArgumentCaptor.forClass(CreateTimeSeriesRequest.class);
+
+ UnaryCallable mockCallable = mock(UnaryCallable.class);
+ when(mockMetricServiceStub.createServiceTimeSeriesCallable()).thenReturn(mockCallable);
+ ApiFuture future = ApiFutures.immediateFuture(Empty.getDefaultInstance());
+ when(mockCallable.futureCall(argumentCaptor.capture())).thenReturn(future);
+
+ long startEpoch = 10;
+ long endEpoch = 15;
+ HistogramPointData histogramPointData =
+ ImmutableHistogramPointData.create(
+ startEpoch,
+ endEpoch,
+ attributes,
+ 3d,
+ true,
+ 1d, // min
+ true,
+ 2d, // max
+ Arrays.asList(1.0),
+ Arrays.asList(1L, 2L));
+
+ MetricData histogramData =
+ ImmutableMetricData.createDoubleHistogram(
+ resource,
+ scope,
+ "spanner.googleapis.com/internal/client/" + OPERATION_LATENCIES_NAME,
+ "description",
+ "ms",
+ ImmutableHistogramData.create(
+ AggregationTemporality.CUMULATIVE, ImmutableList.of(histogramPointData)));
+
+ exporter.export(Arrays.asList(histogramData));
+
+ CreateTimeSeriesRequest request = argumentCaptor.getValue();
+
+ assertThat(request.getTimeSeriesList()).hasSize(1);
+
+ TimeSeries timeSeries = request.getTimeSeriesList().get(0);
+
+ assertThat(timeSeries.getResource().getLabelsMap()).hasSize(4);
+ assertThat(timeSeries.getResource().getLabelsMap())
+ .containsExactly(
+ PROJECT_ID_KEY.getKey(), projectId,
+ INSTANCE_ID_KEY.getKey(), instanceId,
+ LOCATION_ID_KEY.getKey(), locationId,
+ INSTANCE_CONFIG_ID_KEY.getKey(), instanceConfigId);
+
+ assertThat(timeSeries.getMetric().getLabelsMap()).hasSize(4);
+ assertThat(timeSeries.getMetric().getLabelsMap())
+ .containsExactly(
+ DATABASE_KEY.getKey(),
+ databaseId,
+ CLIENT_NAME_KEY.getKey(),
+ clientName,
+ DIRECT_PATH_ENABLED_KEY.getKey(),
+ "true",
+ DIRECT_PATH_USED_KEY.getKey(),
+ "true");
+
+ Distribution distribution = timeSeries.getPoints(0).getValue().getDistributionValue();
+ assertThat(distribution.getCount()).isEqualTo(3);
+ assertThat(timeSeries.getPoints(0).getInterval().getStartTime().getNanos())
+ .isEqualTo(startEpoch);
+ assertThat(timeSeries.getPoints(0).getInterval().getEndTime().getNanos()).isEqualTo(endEpoch);
+ }
+
+ @Test
+ public void testExportingSumDataInBatches() {
+ ArgumentCaptor argumentCaptor =
+ ArgumentCaptor.forClass(CreateTimeSeriesRequest.class);
+
+ UnaryCallable mockCallable = mock(UnaryCallable.class);
+ when(mockMetricServiceStub.createServiceTimeSeriesCallable()).thenReturn(mockCallable);
+ ApiFuture future = ApiFutures.immediateFuture(Empty.getDefaultInstance());
+ when(mockCallable.futureCall(argumentCaptor.capture())).thenReturn(future);
+
+ long startEpoch = 10;
+ long endEpoch = 15;
+
+ Collection toExport = new ArrayList<>();
+ for (int i = 0; i < 250; i++) {
+ LongPointData longPointData =
+ ImmutableLongPointData.create(
+ startEpoch,
+ endEpoch,
+ attributes.toBuilder().put(CLIENT_UID_KEY, "client_uid" + i).build(),
+ i);
+
+ MetricData longData =
+ ImmutableMetricData.createLongSum(
+ resource,
+ scope,
+ "spanner.googleapis.com/internal/client/" + OPERATION_COUNT_NAME,
+ "description",
+ "1",
+ ImmutableSumData.create(
+ true, AggregationTemporality.CUMULATIVE, ImmutableList.of(longPointData)));
+ toExport.add(longData);
+ }
+
+ exporter.export(toExport);
+
+ assertThat(argumentCaptor.getAllValues()).hasSize(2);
+ CreateTimeSeriesRequest firstRequest = argumentCaptor.getAllValues().get(0);
+ CreateTimeSeriesRequest secondRequest = argumentCaptor.getAllValues().get(1);
+
+ assertThat(firstRequest.getTimeSeriesList()).hasSize(200);
+ assertThat(secondRequest.getTimeSeriesList()).hasSize(50);
+
+ for (int i = 0; i < 250; i++) {
+ TimeSeries timeSeries;
+ if (i < 200) {
+ timeSeries = firstRequest.getTimeSeriesList().get(i);
+ } else {
+ timeSeries = secondRequest.getTimeSeriesList().get(i - 200);
+ }
+
+ assertThat(timeSeries.getResource().getLabelsMap()).hasSize(4);
+ assertThat(timeSeries.getResource().getLabelsMap())
+ .containsExactly(
+ PROJECT_ID_KEY.getKey(), projectId,
+ INSTANCE_ID_KEY.getKey(), instanceId,
+ LOCATION_ID_KEY.getKey(), locationId,
+ INSTANCE_CONFIG_ID_KEY.getKey(), instanceConfigId);
+
+ assertThat(timeSeries.getMetric().getLabelsMap()).hasSize(5);
+ assertThat(timeSeries.getMetric().getLabelsMap())
+ .containsExactly(
+ DATABASE_KEY.getKey(),
+ databaseId,
+ CLIENT_NAME_KEY.getKey(),
+ clientName,
+ DIRECT_PATH_ENABLED_KEY.getKey(),
+ "true",
+ DIRECT_PATH_USED_KEY.getKey(),
+ "true",
+ CLIENT_UID_KEY.getKey(),
+ "client_uid" + i);
+
+ assertThat(timeSeries.getPoints(0).getValue().getInt64Value()).isEqualTo(i);
+ assertThat(timeSeries.getPoints(0).getInterval().getStartTime().getNanos())
+ .isEqualTo(startEpoch);
+ assertThat(timeSeries.getPoints(0).getInterval().getEndTime().getNanos()).isEqualTo(endEpoch);
+ }
+ }
+
+ @Test
+ public void getAggregationTemporality() throws IOException {
+ SpannerCloudMonitoringExporter actualExporter =
+ SpannerCloudMonitoringExporter.create(projectId, null);
+ assertThat(actualExporter.getAggregationTemporality(InstrumentType.COUNTER))
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ }
+
+ private static class FakeMetricServiceClient extends MetricServiceClient {
+
+ protected FakeMetricServiceClient(MetricServiceStub stub) {
+ super(stub);
+ }
+ }
+}