From 056d3bb9ff76f5f2d6c45128f693d696cbd4da86 Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Fri, 28 Jul 2023 13:30:05 -0700 Subject: [PATCH] Don't collect empty metrics from storage If a metric has no datapoints, don't send it on. That can foul up exporters. --- .../_internal/metrics_encoder/__init__.py | 23 +++++++++---- .../sdk/metrics/_internal/export/__init__.py | 9 +++--- .../metrics/_internal/measurement_consumer.py | 8 ++--- .../_internal/metric_reader_storage.py | 32 +++++++++++-------- .../test_disable_default_views.py | 8 +---- .../metrics/test_metric_reader_storage.py | 10 +----- 6 files changed, 47 insertions(+), 43 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py index d604786108c..48d61b1bfdc 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py @@ -46,7 +46,7 @@ Sum, ExponentialHistogram as ExponentialHistogramType, ) -from typing import Dict +from typing import Dict, Optional from opentelemetry.proto.resource.v1.resource_pb2 import ( Resource as PB2Resource, ) @@ -154,7 +154,7 @@ def _common_configuration( ) -def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest: +def encode_metrics(data: MetricsData) -> Optional[ExportMetricsServiceRequest]: resource_metrics_dict = {} for resource_metrics in data.resource_metrics: @@ -165,8 +165,6 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest: # associated with an unique resource. scope_metrics_dict = {} - resource_metrics_dict[resource] = scope_metrics_dict - for scope_metrics in resource_metrics.scope_metrics: instrumentation_scope = scope_metrics.scope @@ -181,9 +179,10 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest: ) ) - scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics - for metric in scope_metrics.metrics: + if len(metric.data.data_points) == 0: + continue + pb2_metric = pb2.Metric( name=metric.name, description=metric.description, @@ -301,6 +300,16 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest: pb2_scope_metrics.metrics.append(pb2_metric) + if len(pb2_scope_metrics.metrics) > 0: + # It's possible that all the metrics have zero + # datapoints. If so, we'll have no metrics at all to + # send. + scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics + + if len(scope_metrics_dict) > 0: + # Analogous to the above, it's possible that all scopes are empty. + resource_metrics_dict[resource] = scope_metrics_dict + resource_data = [] for ( sdk_resource, @@ -315,4 +324,6 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest: ) ) resource_metrics = resource_data + if len(resource_metrics) == 0: + return None return ExportMetricsServiceRequest(resource_metrics=resource_metrics) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 5bd94d5aacc..1e66f8db151 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -106,7 +106,7 @@ def export( """Exports a batch of telemetry data. Args: - metrics: The list of `opentelemetry.sdk.metrics.export.Metric` objects to be exported + metrics: The `opentelemetry.sdk.metrics.export.MetricsData` object to be exported Returns: The result of the export @@ -217,7 +217,7 @@ def __init__( "opentelemetry.sdk.metrics.export.MetricReader", AggregationTemporality, ], - Iterable["opentelemetry.sdk.metrics.export.Metric"], + Optional["opentelemetry.sdk.metrics.export.MetricsData"], ] = None self._instrument_class_temporality = { @@ -306,7 +306,8 @@ def __init__( @final def collect(self, timeout_millis: float = 10_000) -> None: """Collects the metrics from the internal SDK state and - invokes the `_receive_metrics` with the collection. + invokes the `_receive_metrics` with the collection if it + contains any data. Args: timeout_millis: Amount of time in milliseconds before this function @@ -335,7 +336,7 @@ def _set_collect_callback( "opentelemetry.sdk.metrics.export.MetricReader", AggregationTemporality, ], - Iterable["opentelemetry.sdk.metrics.export.Metric"], + Optional["opentelemetry.sdk.metrics.export.MetricsData"], ], ) -> None: """This function is internal to the SDK. It should not be called or overridden by users""" diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 9daf1eff461..1b5fc5e9c85 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -17,7 +17,7 @@ from abc import ABC, abstractmethod from threading import Lock from time import time_ns -from typing import Iterable, List, Mapping +from typing import Iterable, List, Mapping, Optional # This kind of import is needed to avoid Sphinx errors. import opentelemetry.sdk.metrics @@ -29,7 +29,7 @@ from opentelemetry.sdk.metrics._internal.metric_reader_storage import ( MetricReaderStorage, ) -from opentelemetry.sdk.metrics._internal.point import Metric +from opentelemetry.sdk.metrics._internal.point import MetricsData class MeasurementConsumer(ABC): @@ -51,7 +51,7 @@ def collect( self, metric_reader: "opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, - ) -> Iterable[Metric]: + ) -> Optional[MetricsData]: pass @@ -94,7 +94,7 @@ def collect( self, metric_reader: "opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, - ) -> Iterable[Metric]: + ) -> Optional[MetricsData]: with self._lock: metric_reader_storage = self._reader_storages[metric_reader] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py index bef57eaab09..3f18813edeb 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -15,7 +15,7 @@ from logging import getLogger from threading import RLock from time import time_ns -from typing import Dict, List +from typing import Dict, List, Optional from opentelemetry.metrics import ( Asynchronous, @@ -119,7 +119,7 @@ def consume_measurement(self, measurement: Measurement) -> None: ): view_instrument_match.consume_measurement(measurement) - def collect(self) -> MetricsData: + def collect(self) -> Optional[MetricsData]: # Use a list instead of yielding to prevent a slow reader from holding # SDK locks @@ -206,15 +206,19 @@ def collect(self) -> MetricsData: aggregation_temporality=aggregation_temporality, ) - metrics.append( - Metric( - # pylint: disable=protected-access - name=view_instrument_match._name, - description=view_instrument_match._description, - unit=view_instrument_match._instrument.unit, - data=data, + if len(data.data_points) > 0: + metrics.append( + Metric( + # pylint: disable=protected-access + name=view_instrument_match._name, + description=view_instrument_match._description, + unit=view_instrument_match._instrument.unit, + data=data, + ) ) - ) + + if len(metrics) == 0: + continue if instrument.instrumentation_scope not in ( instrumentation_scope_scope_metrics @@ -231,13 +235,15 @@ def collect(self) -> MetricsData: instrument.instrumentation_scope ].metrics.extend(metrics) + scope_metrics = list(instrumentation_scope_scope_metrics.values()) + if len(scope_metrics) == 0: + return None + return MetricsData( resource_metrics=[ ResourceMetrics( resource=self._sdk_config.resource, - scope_metrics=list( - instrumentation_scope_scope_metrics.values() - ), + scope_metrics=scope_metrics, schema_url=self._sdk_config.resource.schema_url, ) ] diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py index ad90fe9a298..770b9f6253e 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py @@ -32,13 +32,7 @@ def test_disable_default_views(self): counter.add(10, {"label": "value2"}) counter.add(10, {"label": "value3"}) self.assertEqual( - ( - reader.get_metrics_data() - .resource_metrics[0] - .scope_metrics[0] - .metrics - ), - [], + None, reader.get_metrics_data() ) def test_disable_default_views_add_custom(self): diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py index 97b5532feae..9247db52f9a 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -314,15 +314,7 @@ def test_drop_aggregation(self): ) metric_reader_storage.consume_measurement(Measurement(1, counter)) - self.assertEqual( - [], - ( - metric_reader_storage.collect() - .resource_metrics[0] - .scope_metrics[0] - .metrics - ), - ) + self.assertEqual(None, (metric_reader_storage.collect()),) def test_same_collection_start(self):