From a62f2eecb143ce7f6639fde5e2e763ec9edaa18c Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Fri, 9 May 2025 11:03:53 -0700 Subject: [PATCH 1/4] Add OTLP HTTP MetricExporter max export batch size --- .../proto/http/metric_exporter/__init__.py | 454 ++++++++++++++++- .../metrics/test_otlp_metrics_exporter.py | 474 ++++++++++++++++++ 2 files changed, 910 insertions(+), 18 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 4feea8d4302..75ebe53d648 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -116,7 +116,29 @@ def __init__( preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, + max_export_batch_size: int | None = None, ): + """OTLP HTTP metrics exporter + + Args: + endpoint: Target URL to which the exporter is going to send metrics + certificate_file: Path to the certificate file to use for any TLS + client_key_file: Path to the client key file to use for any TLS + client_certificate_file: Path to the client certificate file to use for any TLS + headers: Headers to be sent with HTTP requests at export + timeout: Timeout in seconds for export + compression: Compression to use; one of none, gzip, deflate + session: Requests session to use at export + preferred_temporality: Map of preferred temporality for each metric type. + See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what + preferred temporality is. + preferred_aggregation: Map of preferred aggregation for each metric type. + See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what + preferred aggregation is. + max_export_batch_size: Maximum number of data points to export in a single request. + If not set there is no limit to the number of data points in a request. + If it is set and the number of data points exceeds the max, the request will be split. + """ self._endpoint = endpoint or environ.get( OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, _append_metrics_path( @@ -165,6 +187,7 @@ def __init__( self._common_configuration( preferred_temporality, preferred_aggregation ) + self._max_export_batch_size: int | None = max_export_batch_size def _export(self, serialized_data: bytes): data = serialized_data @@ -219,27 +242,422 @@ def export( if delay == self._MAX_RETRY_TIMEOUT: return MetricExportResult.FAILURE - resp = self._export(serialized_data.SerializeToString()) - # pylint: disable=no-else-return - if resp.ok: - return MetricExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting metric batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue + if self._max_export_batch_size is None: + resp = self._export(serialized_data.SerializeToString()) + # pylint: disable=no-else-return + if resp.ok: + return MetricExportResult.SUCCESS + elif self._retryable(resp): + _logger.warning( + "Transient error %s encountered while exporting metric batch, retrying in %ss.", + resp.reason, + delay, + ) + sleep(delay) + continue + else: + _logger.error( + "Failed to export batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return MetricExportResult.FAILURE + + # Else, attempt export in batches for this retry else: - _logger.error( - "Failed to export batch code: %s, reason: %s", - resp.status_code, - resp.text, - ) - return MetricExportResult.FAILURE + export_result = MetricExportResult.SUCCESS + for split_metrics_data in self._split_metrics_data(serialized_data): + split_resp = self._export( + split_metrics_data.SerializeToString() + ) + + if split_resp.ok: + export_result = MetricExportResult.SUCCESS + elif self._retryable(split_resp): + _logger.warning( + "Transient error %s encountered while exporting metric batch, retrying in %ss.", + split_resp.reason, + delay, + ) + sleep(delay) + continue + else: + _logger.error( + "Failed to export batch code: %s, reason: %s", + split_resp.status_code, + split_resp.text, + ) + export_result = MetricExportResult.FAILURE + + # Return result after all batches are attempted + return export_result + return MetricExportResult.FAILURE + def _split_metrics_data( + self, + metrics_data: pb2.MetricsData, + ) -> Iterable[pb2.MetricsData]: + """Splits metrics data into several MetricsData (copies protobuf originals), + based on configured data point max export batch size. + + Args: + metrics_data: metrics object based on HTTP protocol buffer definition + + Returns: + Iterable[pb2.MetricsData]: An iterable of pb2.MetricsData objects containing + pb2.ResourceMetrics, pb2.ScopeMetrics, pb2.Metrics, and data points + """ + batch_size: int = 0 + # Stores split metrics data as editable references + # used to write batched pb2 objects for export when finalized + split_resource_metrics = [] + + for resource_metrics in metrics_data.resource_metrics: + split_scope_metrics = [] + split_resource_metrics.append( + { + "resource": resource_metrics.resource, + "schema_url": resource_metrics.schema_url, + "scope_metrics": split_scope_metrics, + } + ) + + for scope_metrics in resource_metrics.scope_metrics: + split_metrics = [] + split_scope_metrics.append( + { + "scope": scope_metrics.scope, + "schema_url": scope_metrics.schema_url, + "metrics": split_metrics, + } + ) + + for metric in scope_metrics.metrics: + split_data_points = [] + + # protobuf specifies metrics types (e.g. Sum, Histogram) + # with different accessors for data points, etc + # We maintain these structures throughout batch calculation + current_data_points = [] + if metric.HasField("sum"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "sum": { + "aggregation_temporality": metric.sum.aggregation_temporality, + "is_monotonic": metric.sum.is_monotonic, + "data_points": split_data_points, + } + } + ) + current_data_points = metric.sum.data_points + elif metric.HasField("histogram"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "histogram": { + "aggregation_temporality": metric.histogram.aggregation_temporality, + "data_points": split_data_points, + } + } + ) + current_data_points = metric.histogram.data_points + elif metric.HasField("exponential_histogram"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "exponential_histogram": { + "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, + "data_points": split_data_points, + } + } + ) + current_data_points = metric.exponential_histogram.data_points + elif metric.HasField("gauge"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "gauge": { + "data_points": split_data_points, + } + } + ) + current_data_points = metric.gauge.data_points + elif metric.HasField("summary"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "summary": { + "data_points": split_data_points, + } + } + ) + else: + _logger.warning("Tried to split and export an unsupported metric type. Skipping.") + continue + + for data_point in current_data_points: + split_data_points.append(data_point) + batch_size += 1 + + if batch_size >= self._max_export_batch_size: + yield pb2.MetricsData( + resource_metrics=self._get_split_resource_metrics_pb2(split_resource_metrics) + ) + + # Reset all the reference variables with current metrics_data position + # minus yielded data_points. Need to clear data_points and keep metric + # to avoid duplicate data_point export + batch_size = 0 + split_data_points = [] + + if metric.HasField("sum"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "sum": { + "aggregation_temporality": metric.sum.aggregation_temporality, + "is_monotonic": metric.sum.is_monotonic, + "data_points": split_data_points, + } + } + ] + elif metric.HasField("histogram"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "histogram": { + "aggregation_temporality": metric.histogram.aggregation_temporality, + "data_points": split_data_points, + } + } + ] + elif metric.HasField("exponential_histogram"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "exponential_histogram": { + "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, + "data_points": split_data_points, + } + } + ] + elif metric.HasField("gauge"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "gauge": { + "data_points": split_data_points, + } + } + ] + elif metric.HasField("summary"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "summary": { + "data_points": split_data_points, + } + } + ] + + split_scope_metrics = [ + { + "scope": scope_metrics.scope, + "schema_url": scope_metrics.schema_url, + "metrics": split_metrics, + } + ] + split_resource_metrics = [ + { + "resource": resource_metrics.resource, + "schema_url": resource_metrics.schema_url, + "scope_metrics": split_scope_metrics, + } + ] + + if not split_data_points: + # If data_points is empty remove the whole metric + split_metrics.pop() + + if not split_metrics: + # If metrics is empty remove the whole scope_metrics + split_scope_metrics.pop() + + if not split_scope_metrics: + # If scope_metrics is empty remove the whole resource_metrics + split_resource_metrics.pop() + + if batch_size > 0: + yield pb2.MetricsData( + resource_metrics=self._get_split_resource_metrics_pb2(split_resource_metrics) + ) + + def _get_split_resource_metrics_pb2( + self, + split_resource_metrics: List[Dict], + ) -> List[pb2.ResourceMetrics]: + """Helper that returns a list of pb2.ResourceMetrics objects based on split_resource_metrics. + Example input: + + ```python + [ + { + "resource": , + "schema_url": "http://foo-bar", + "scope_metrics": [ + "scope": , + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "apples", + "description": "number of apples purchased", + "sum": { + "aggregation_temporality": 1, + "is_monotonic": "false", + "data_points": [ + { + start_time_unix_nano: 1000 + time_unix_nano: 1001 + exemplars { + time_unix_nano: 1002 + span_id: "foo-span" + trace_id: "foo-trace" + as_int: 5 + } + as_int: 5 + } + ] + } + }, + ], + ], + }, + ] + ``` + + Args: + split_resource_metrics: A list of dict representations of ResourceMetrics, + ScopeMetrics, Metrics, and data points. + + Returns: + List[pb2.ResourceMetrics]: A list of pb2.ResourceMetrics objects containing + pb2.ScopeMetrics, pb2.Metrics, and data points + """ + split_resource_metrics_pb = [] + for resource_metrics in split_resource_metrics: + new_resource_metrics = pb2.ResourceMetrics( + resource=resource_metrics.get("resource"), + scope_metrics=[], + schema_url=resource_metrics.get("schema_url"), + ) + for scope_metrics in resource_metrics.get("scope_metrics", []): + new_scope_metrics = pb2.ScopeMetrics( + scope=scope_metrics.get("scope"), + metrics=[], + schema_url=scope_metrics.get("schema_url"), + ) + + for metric in scope_metrics.get("metrics", []): + new_metric = None + data_points = [] + + if "sum" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + sum=pb2.Sum( + data_points=[], + aggregation_temporality=metric.get("sum").get("aggregation_temporality"), + is_monotonic=metric.get("sum").get("is_monotonic"), + ) + ) + data_points = metric.get("sum").get("data_points") + elif "histogram" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + histogram=pb2.Histogram( + data_points=[], + aggregation_temporality=metric.get("histogram").get("aggregation_temporality"), + ), + ) + data_points = metric.get("histogram").get("data_points") + elif "exponential_histogram" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + exponential_histogram=pb2.ExponentialHistogram( + data_points=[], + aggregation_temporality=metric.get("exponential_histogram").get("aggregation_temporality"), + ), + ) + data_points = metric.get("exponential_histogram").get("data_points") + elif "gauge" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + gauge=pb2.Gauge( + data_points=[], + ) + ) + data_points = metric.get("gauge").get("data_points") + elif "summary" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + summary=pb2.Summary( + data_points=[], + ) + ) + data_points = metric.get("summary").get("data_points") + else: + _logger.warning("Tried to split and export an unsupported metric type. Skipping.") + continue + + for data_point in data_points: + if "sum" in metric: + new_metric.sum.data_points.append(data_point) + elif "histogram" in metric: + new_metric.histogram.data_points.append(data_point) + elif "exponential_histogram" in metric: + new_metric.exponential_histogram.data_points.append(data_point) + elif "gauge" in metric: + new_metric.gauge.data_points.append(data_point) + elif "summary" in metric: + new_metric.summary.data_points.append(data_point) + + new_scope_metrics.metrics.append(new_metric) + new_resource_metrics.scope_metrics.append(new_scope_metrics) + split_resource_metrics_pb.append(new_resource_metrics) + return split_resource_metrics_pb + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 16bb3e54286..d5a4c734d09 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -14,6 +14,7 @@ from logging import WARNING from os import environ +from typing import List from unittest import TestCase from unittest.mock import MagicMock, Mock, call, patch @@ -33,6 +34,12 @@ OTLPMetricExporter, ) from opentelemetry.exporter.otlp.proto.http.version import __version__ +from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 +from opentelemetry.proto.common.v1.common_pb2 import ( + InstrumentationScope, + KeyValue, +) +from opentelemetry.proto.resource.v1.resource_pb2 import Resource as Pb2Resource from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, @@ -86,6 +93,7 @@ # pylint: disable=protected-access class TestOTLPMetricExporter(TestCase): + # pylint: disable=too-many-public-methods def setUp(self): self.metrics = { "sum_int": MetricsData( @@ -331,6 +339,426 @@ def test_serialization(self, mock_post): cert=exporter._client_cert, ) + def test_split_metrics_data_many_data_points(self): + metrics_data = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + OTLPMetricExporter(max_export_batch_size=2)._split_metrics_data( + metrics_data=metrics_data, + ) + ) + + self.assertEqual( + [ + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ), + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ), + ], + split_metrics_data, + ) + + def test_split_metrics_data_nb_data_points_equal_batch_size(self): + metrics_data = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + OTLPMetricExporter(max_export_batch_size=3)._split_metrics_data( + metrics_data=metrics_data, + ) + ) + + self.assertEqual( + [ + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ), + ], + split_metrics_data, + ) + + def test_split_metrics_data_many_resources_scopes_metrics(self): + # GIVEN + metrics_data = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + ], + ), + _gauge( + index=2, + data_points=[ + _number_data_point(12), + ], + ), + ], + ), + _scope_metrics( + index=2, + metrics=[ + _gauge( + index=3, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + _resource_metrics( + index=2, + scope_metrics=[ + _scope_metrics( + index=3, + metrics=[ + _gauge( + index=4, + data_points=[ + _number_data_point(14), + ], + ), + ], + ), + ], + ), + ] + ) + + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + OTLPMetricExporter(max_export_batch_size=2)._split_metrics_data( + metrics_data=metrics_data, + ) + ) + + self.assertEqual( + [ + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + ], + ), + _gauge( + index=2, + data_points=[ + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ), + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=2, + metrics=[ + _gauge( + index=3, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + _resource_metrics( + index=2, + scope_metrics=[ + _scope_metrics( + index=3, + metrics=[ + _gauge( + index=4, + data_points=[ + _number_data_point(14), + ], + ), + ], + ), + ], + ), + ] + ), + ], + split_metrics_data, + ) + + def test_get_split_resource_metrics_pb2_one_of_each(self): + split_resource_metrics = [ + { + "resource": Pb2Resource( + attributes=[ + KeyValue(key="foo", value={"string_value": "bar"}) + ], + ), + "schema_url": "http://foo-bar", + "scope_metrics": [ + { + "scope": InstrumentationScope(name="foo-scope", version="1.0.0"), + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "foo-metric", + "description": "foo-description", + "unit": "foo-unit", + "sum": { + "aggregation_temporality": 1, + "is_monotonic": True, + "data_points": [ + pb2.NumberDataPoint( + attributes=[ + KeyValue(key="dp_key", value={"string_value": "dp_value"}) + ], + start_time_unix_nano=12345, + time_unix_nano=12350, + as_double=42.42, + ) + ], + }, + } + ], + } + ], + } + ] + + result = OTLPMetricExporter()._get_split_resource_metrics_pb2(split_resource_metrics) + self.assertEqual(len(result), 1) + self.assertIsInstance(result[0], pb2.ResourceMetrics) + self.assertEqual(result[0].schema_url, "http://foo-bar") + self.assertEqual(len(result[0].scope_metrics), 1) + self.assertEqual(result[0].scope_metrics[0].scope.name, "foo-scope") + self.assertEqual(len(result[0].scope_metrics[0].metrics), 1) + self.assertEqual(result[0].scope_metrics[0].metrics[0].name, "foo-metric") + self.assertEqual(result[0].scope_metrics[0].metrics[0].sum.is_monotonic, True) + + def test_get_split_resource_metrics_pb2_multiples(self): + split_resource_metrics = [ + { + "resource": Pb2Resource( + attributes=[KeyValue(key="foo1", value={"string_value": "bar2"})], + ), + "schema_url": "http://foo-bar-1", + "scope_metrics": [ + { + "scope": InstrumentationScope(name="foo-scope-1", version="1.0.0"), + "schema_url": "http://foo-baz-1", + "metrics": [ + { + "name": "foo-metric-1", + "description": "foo-description-1", + "unit": "foo-unit-1", + "gauge": { + "data_points": [ + pb2.NumberDataPoint( + attributes=[ + KeyValue(key="dp_key", value={"string_value": "dp_value"}) + ], + start_time_unix_nano=12345, + time_unix_nano=12350, + as_double=42.42, + ) + ], + }, + } + ], + } + ], + }, + { + "resource": Pb2Resource( + attributes=[KeyValue(key="foo2", value={"string_value": "bar2"})], + ), + "schema_url": "http://foo-bar-2", + "scope_metrics": [ + { + "scope": InstrumentationScope(name="foo-scope-2", version="2.0.0"), + "schema_url": "http://foo-baz-2", + "metrics": [ + { + "name": "foo-metric-2", + "description": "foo-description-2", + "unit": "foo-unit-2", + "histogram": { + "aggregation_temporality": 2, + "data_points": [ + pb2.HistogramDataPoint( + attributes=[KeyValue(key="dp_key", value={"string_value": "dp_value"})], + start_time_unix_nano=12345, + time_unix_nano=12350, + ) + ], + }, + } + ], + } + ], + }, + ] + + result = OTLPMetricExporter()._get_split_resource_metrics_pb2(split_resource_metrics) + self.assertEqual(len(result), 2) + self.assertEqual(result[0].schema_url, "http://foo-bar-1") + self.assertEqual(result[1].schema_url, "http://foo-bar-2") + self.assertEqual(len(result[0].scope_metrics), 1) + self.assertEqual(len(result[1].scope_metrics), 1) + self.assertEqual(result[0].scope_metrics[0].scope.name, "foo-scope-1") + self.assertEqual(result[1].scope_metrics[0].scope.name, "foo-scope-2") + self.assertEqual(result[0].scope_metrics[0].metrics[0].name, "foo-metric-1") + self.assertEqual(result[1].scope_metrics[0].metrics[0].name, "foo-metric-2") + + def test_get_split_resource_metrics_pb2_unsupported_metric_type(self): + split_resource_metrics = [ + { + "resource": Pb2Resource( + attributes=[KeyValue(key="foo", value={"string_value": "bar"})], + ), + "schema_url": "http://foo-bar", + "scope_metrics": [ + { + "scope": InstrumentationScope(name="foo", version="1.0.0"), + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "unsupported-metric", + "description": "foo-bar", + "unit": "foo-bar", + "unsupported_metric_type": {}, + } + ], + } + ], + } + ] + + with self.assertLogs(level="WARNING") as log: + result = OTLPMetricExporter()._get_split_resource_metrics_pb2(split_resource_metrics) + self.assertEqual(len(result), 1) + self.assertIn("Tried to split and export an unsupported metric type", log.output[0]) + @activate @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") def test_exponential_backoff(self, mock_sleep): @@ -523,3 +951,49 @@ def test_preferred_aggregation_override(self): self.assertEqual( exporter._preferred_aggregation[Histogram], histogram_aggregation ) + + + +def _resource_metrics( + index: int, scope_metrics: List[pb2.ScopeMetrics] +) -> pb2.ResourceMetrics: + return pb2.ResourceMetrics( + resource={ + "attributes": [ + KeyValue(key="a", value={"int_value": index}) + ], + }, + schema_url=f"resource_url_{index}", + scope_metrics=scope_metrics, + ) + + +def _scope_metrics(index: int, metrics: List[pb2.Metric]) -> pb2.ScopeMetrics: + return pb2.ScopeMetrics( + scope=InstrumentationScope(name=f"scope_{index}"), + schema_url=f"scope_url_{index}", + metrics=metrics, + ) + + +def _gauge(index: int, data_points: List[pb2.NumberDataPoint]) -> pb2.Metric: + return pb2.Metric( + name=f"gauge_{index}", + description="description", + unit="unit", + gauge=pb2.Gauge( + data_points=data_points + ), + ) + + +def _number_data_point(value: int) -> pb2.NumberDataPoint: + return pb2.NumberDataPoint( + attributes=[ + KeyValue(key="a", value={"int_value": 1}), + KeyValue(key="b", value={"bool_value": True}), + ], + start_time_unix_nano=1641946015139533244, + time_unix_nano=1641946016139533244, + as_int=value, + ) From d09febaf1571c19d725c795666a762847fd12b9c Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Fri, 9 May 2025 11:20:34 -0700 Subject: [PATCH 2/4] Changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc6b3db5227..c8f98afd0cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add configurable `max_export_batch_size` to OTLP HTTP metrics exporter + ([#4576](https://github.com/open-telemetry/opentelemetry-python/pull/4576)) + ## Version 1.33.0/0.54b0 (2025-05-09) - Fix intermittent `Connection aborted` error when using otlp/http exporters From dc86036a11e84ba593bb7632ec5ca7258747c546 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Fri, 9 May 2025 11:50:45 -0700 Subject: [PATCH 3/4] Lint --- .../proto/http/metric_exporter/__init__.py | 107 +++++++++++------- .../metrics/test_otlp_metrics_exporter.py | 100 +++++++++++----- 2 files changed, 140 insertions(+), 67 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 75ebe53d648..033b1404bbb 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -22,6 +22,7 @@ Any, Callable, Dict, + Iterable, List, Mapping, Sequence, @@ -262,11 +263,13 @@ def export( resp.text, ) return MetricExportResult.FAILURE - + # Else, attempt export in batches for this retry else: export_result = MetricExportResult.SUCCESS - for split_metrics_data in self._split_metrics_data(serialized_data): + for split_metrics_data in self._split_metrics_data( + serialized_data + ): split_resp = self._export( split_metrics_data.SerializeToString() ) @@ -288,17 +291,17 @@ def export( split_resp.text, ) export_result = MetricExportResult.FAILURE - + # Return result after all batches are attempted return export_result - + return MetricExportResult.FAILURE def _split_metrics_data( self, metrics_data: pb2.MetricsData, ) -> Iterable[pb2.MetricsData]: - """Splits metrics data into several MetricsData (copies protobuf originals), + """Splits metrics data into several MetricsData (copies protobuf originals), based on configured data point max export batch size. Args: @@ -350,7 +353,7 @@ def _split_metrics_data( "aggregation_temporality": metric.sum.aggregation_temporality, "is_monotonic": metric.sum.is_monotonic, "data_points": split_data_points, - } + }, } ) current_data_points = metric.sum.data_points @@ -363,7 +366,7 @@ def _split_metrics_data( "histogram": { "aggregation_temporality": metric.histogram.aggregation_temporality, "data_points": split_data_points, - } + }, } ) current_data_points = metric.histogram.data_points @@ -376,10 +379,12 @@ def _split_metrics_data( "exponential_histogram": { "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, "data_points": split_data_points, - } + }, } ) - current_data_points = metric.exponential_histogram.data_points + current_data_points = ( + metric.exponential_histogram.data_points + ) elif metric.HasField("gauge"): split_metrics.append( { @@ -388,7 +393,7 @@ def _split_metrics_data( "unit": metric.unit, "gauge": { "data_points": split_data_points, - } + }, } ) current_data_points = metric.gauge.data_points @@ -400,11 +405,13 @@ def _split_metrics_data( "unit": metric.unit, "summary": { "data_points": split_data_points, - } + }, } ) else: - _logger.warning("Tried to split and export an unsupported metric type. Skipping.") + _logger.warning( + "Tried to split and export an unsupported metric type. Skipping." + ) continue for data_point in current_data_points: @@ -413,7 +420,9 @@ def _split_metrics_data( if batch_size >= self._max_export_batch_size: yield pb2.MetricsData( - resource_metrics=self._get_split_resource_metrics_pb2(split_resource_metrics) + resource_metrics=self._get_split_resource_metrics_pb2( + split_resource_metrics + ) ) # Reset all the reference variables with current metrics_data position @@ -432,7 +441,7 @@ def _split_metrics_data( "aggregation_temporality": metric.sum.aggregation_temporality, "is_monotonic": metric.sum.is_monotonic, "data_points": split_data_points, - } + }, } ] elif metric.HasField("histogram"): @@ -444,7 +453,7 @@ def _split_metrics_data( "histogram": { "aggregation_temporality": metric.histogram.aggregation_temporality, "data_points": split_data_points, - } + }, } ] elif metric.HasField("exponential_histogram"): @@ -456,7 +465,7 @@ def _split_metrics_data( "exponential_histogram": { "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, "data_points": split_data_points, - } + }, } ] elif metric.HasField("gauge"): @@ -467,7 +476,7 @@ def _split_metrics_data( "unit": metric.unit, "gauge": { "data_points": split_data_points, - } + }, } ] elif metric.HasField("summary"): @@ -478,7 +487,7 @@ def _split_metrics_data( "unit": metric.unit, "summary": { "data_points": split_data_points, - } + }, } ] @@ -511,7 +520,9 @@ def _split_metrics_data( if batch_size > 0: yield pb2.MetricsData( - resource_metrics=self._get_split_resource_metrics_pb2(split_resource_metrics) + resource_metrics=self._get_split_resource_metrics_pb2( + split_resource_metrics + ) ) def _get_split_resource_metrics_pb2( @@ -568,16 +579,16 @@ def _get_split_resource_metrics_pb2( split_resource_metrics_pb = [] for resource_metrics in split_resource_metrics: new_resource_metrics = pb2.ResourceMetrics( - resource=resource_metrics.get("resource"), - scope_metrics=[], - schema_url=resource_metrics.get("schema_url"), - ) + resource=resource_metrics.get("resource"), + scope_metrics=[], + schema_url=resource_metrics.get("schema_url"), + ) for scope_metrics in resource_metrics.get("scope_metrics", []): new_scope_metrics = pb2.ScopeMetrics( - scope=scope_metrics.get("scope"), - metrics=[], - schema_url=scope_metrics.get("schema_url"), - ) + scope=scope_metrics.get("scope"), + metrics=[], + schema_url=scope_metrics.get("schema_url"), + ) for metric in scope_metrics.get("metrics", []): new_metric = None @@ -590,9 +601,13 @@ def _get_split_resource_metrics_pb2( unit=metric.get("unit"), sum=pb2.Sum( data_points=[], - aggregation_temporality=metric.get("sum").get("aggregation_temporality"), - is_monotonic=metric.get("sum").get("is_monotonic"), - ) + aggregation_temporality=metric.get("sum").get( + "aggregation_temporality" + ), + is_monotonic=metric.get("sum").get( + "is_monotonic" + ), + ), ) data_points = metric.get("sum").get("data_points") elif "histogram" in metric: @@ -602,10 +617,14 @@ def _get_split_resource_metrics_pb2( unit=metric.get("unit"), histogram=pb2.Histogram( data_points=[], - aggregation_temporality=metric.get("histogram").get("aggregation_temporality"), + aggregation_temporality=metric.get( + "histogram" + ).get("aggregation_temporality"), ), ) - data_points = metric.get("histogram").get("data_points") + data_points = metric.get("histogram").get( + "data_points" + ) elif "exponential_histogram" in metric: new_metric = pb2.Metric( name=metric.get("name"), @@ -613,10 +632,14 @@ def _get_split_resource_metrics_pb2( unit=metric.get("unit"), exponential_histogram=pb2.ExponentialHistogram( data_points=[], - aggregation_temporality=metric.get("exponential_histogram").get("aggregation_temporality"), + aggregation_temporality=metric.get( + "exponential_histogram" + ).get("aggregation_temporality"), ), ) - data_points = metric.get("exponential_histogram").get("data_points") + data_points = metric.get("exponential_histogram").get( + "data_points" + ) elif "gauge" in metric: new_metric = pb2.Metric( name=metric.get("name"), @@ -624,7 +647,7 @@ def _get_split_resource_metrics_pb2( unit=metric.get("unit"), gauge=pb2.Gauge( data_points=[], - ) + ), ) data_points = metric.get("gauge").get("data_points") elif "summary" in metric: @@ -634,11 +657,13 @@ def _get_split_resource_metrics_pb2( unit=metric.get("unit"), summary=pb2.Summary( data_points=[], - ) + ), ) data_points = metric.get("summary").get("data_points") else: - _logger.warning("Tried to split and export an unsupported metric type. Skipping.") + _logger.warning( + "Tried to split and export an unsupported metric type. Skipping." + ) continue for data_point in data_points: @@ -647,15 +672,17 @@ def _get_split_resource_metrics_pb2( elif "histogram" in metric: new_metric.histogram.data_points.append(data_point) elif "exponential_histogram" in metric: - new_metric.exponential_histogram.data_points.append(data_point) + new_metric.exponential_histogram.data_points.append( + data_point + ) elif "gauge" in metric: new_metric.gauge.data_points.append(data_point) elif "summary" in metric: new_metric.summary.data_points.append(data_point) - + new_scope_metrics.metrics.append(new_metric) new_resource_metrics.scope_metrics.append(new_scope_metrics) - split_resource_metrics_pb.append(new_resource_metrics) + split_resource_metrics_pb.append(new_resource_metrics) return split_resource_metrics_pb def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index d5a4c734d09..6751c9bd651 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=too-many-lines from logging import WARNING from os import environ from typing import List @@ -34,12 +35,14 @@ OTLPMetricExporter, ) from opentelemetry.exporter.otlp.proto.http.version import __version__ -from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 from opentelemetry.proto.common.v1.common_pb2 import ( InstrumentationScope, KeyValue, ) -from opentelemetry.proto.resource.v1.resource_pb2 import Resource as Pb2Resource +from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as Pb2Resource, +) from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, @@ -617,7 +620,9 @@ def test_get_split_resource_metrics_pb2_one_of_each(self): "schema_url": "http://foo-bar", "scope_metrics": [ { - "scope": InstrumentationScope(name="foo-scope", version="1.0.0"), + "scope": InstrumentationScope( + name="foo-scope", version="1.0.0" + ), "schema_url": "http://foo-baz", "metrics": [ { @@ -630,7 +635,12 @@ def test_get_split_resource_metrics_pb2_one_of_each(self): "data_points": [ pb2.NumberDataPoint( attributes=[ - KeyValue(key="dp_key", value={"string_value": "dp_value"}) + KeyValue( + key="dp_key", + value={ + "string_value": "dp_value" + }, + ) ], start_time_unix_nano=12345, time_unix_nano=12350, @@ -645,26 +655,36 @@ def test_get_split_resource_metrics_pb2_one_of_each(self): } ] - result = OTLPMetricExporter()._get_split_resource_metrics_pb2(split_resource_metrics) + result = OTLPMetricExporter()._get_split_resource_metrics_pb2( + split_resource_metrics + ) self.assertEqual(len(result), 1) self.assertIsInstance(result[0], pb2.ResourceMetrics) self.assertEqual(result[0].schema_url, "http://foo-bar") self.assertEqual(len(result[0].scope_metrics), 1) self.assertEqual(result[0].scope_metrics[0].scope.name, "foo-scope") self.assertEqual(len(result[0].scope_metrics[0].metrics), 1) - self.assertEqual(result[0].scope_metrics[0].metrics[0].name, "foo-metric") - self.assertEqual(result[0].scope_metrics[0].metrics[0].sum.is_monotonic, True) + self.assertEqual( + result[0].scope_metrics[0].metrics[0].name, "foo-metric" + ) + self.assertEqual( + result[0].scope_metrics[0].metrics[0].sum.is_monotonic, True + ) def test_get_split_resource_metrics_pb2_multiples(self): split_resource_metrics = [ { "resource": Pb2Resource( - attributes=[KeyValue(key="foo1", value={"string_value": "bar2"})], + attributes=[ + KeyValue(key="foo1", value={"string_value": "bar2"}) + ], ), "schema_url": "http://foo-bar-1", "scope_metrics": [ { - "scope": InstrumentationScope(name="foo-scope-1", version="1.0.0"), + "scope": InstrumentationScope( + name="foo-scope-1", version="1.0.0" + ), "schema_url": "http://foo-baz-1", "metrics": [ { @@ -675,7 +695,12 @@ def test_get_split_resource_metrics_pb2_multiples(self): "data_points": [ pb2.NumberDataPoint( attributes=[ - KeyValue(key="dp_key", value={"string_value": "dp_value"}) + KeyValue( + key="dp_key", + value={ + "string_value": "dp_value" + }, + ) ], start_time_unix_nano=12345, time_unix_nano=12350, @@ -690,12 +715,16 @@ def test_get_split_resource_metrics_pb2_multiples(self): }, { "resource": Pb2Resource( - attributes=[KeyValue(key="foo2", value={"string_value": "bar2"})], + attributes=[ + KeyValue(key="foo2", value={"string_value": "bar2"}) + ], ), "schema_url": "http://foo-bar-2", "scope_metrics": [ { - "scope": InstrumentationScope(name="foo-scope-2", version="2.0.0"), + "scope": InstrumentationScope( + name="foo-scope-2", version="2.0.0" + ), "schema_url": "http://foo-baz-2", "metrics": [ { @@ -706,7 +735,14 @@ def test_get_split_resource_metrics_pb2_multiples(self): "aggregation_temporality": 2, "data_points": [ pb2.HistogramDataPoint( - attributes=[KeyValue(key="dp_key", value={"string_value": "dp_value"})], + attributes=[ + KeyValue( + key="dp_key", + value={ + "string_value": "dp_value" + }, + ) + ], start_time_unix_nano=12345, time_unix_nano=12350, ) @@ -719,7 +755,9 @@ def test_get_split_resource_metrics_pb2_multiples(self): }, ] - result = OTLPMetricExporter()._get_split_resource_metrics_pb2(split_resource_metrics) + result = OTLPMetricExporter()._get_split_resource_metrics_pb2( + split_resource_metrics + ) self.assertEqual(len(result), 2) self.assertEqual(result[0].schema_url, "http://foo-bar-1") self.assertEqual(result[1].schema_url, "http://foo-bar-2") @@ -727,19 +765,27 @@ def test_get_split_resource_metrics_pb2_multiples(self): self.assertEqual(len(result[1].scope_metrics), 1) self.assertEqual(result[0].scope_metrics[0].scope.name, "foo-scope-1") self.assertEqual(result[1].scope_metrics[0].scope.name, "foo-scope-2") - self.assertEqual(result[0].scope_metrics[0].metrics[0].name, "foo-metric-1") - self.assertEqual(result[1].scope_metrics[0].metrics[0].name, "foo-metric-2") + self.assertEqual( + result[0].scope_metrics[0].metrics[0].name, "foo-metric-1" + ) + self.assertEqual( + result[1].scope_metrics[0].metrics[0].name, "foo-metric-2" + ) def test_get_split_resource_metrics_pb2_unsupported_metric_type(self): split_resource_metrics = [ { "resource": Pb2Resource( - attributes=[KeyValue(key="foo", value={"string_value": "bar"})], + attributes=[ + KeyValue(key="foo", value={"string_value": "bar"}) + ], ), "schema_url": "http://foo-bar", "scope_metrics": [ { - "scope": InstrumentationScope(name="foo", version="1.0.0"), + "scope": InstrumentationScope( + name="foo", version="1.0.0" + ), "schema_url": "http://foo-baz", "metrics": [ { @@ -755,9 +801,14 @@ def test_get_split_resource_metrics_pb2_unsupported_metric_type(self): ] with self.assertLogs(level="WARNING") as log: - result = OTLPMetricExporter()._get_split_resource_metrics_pb2(split_resource_metrics) + result = OTLPMetricExporter()._get_split_resource_metrics_pb2( + split_resource_metrics + ) self.assertEqual(len(result), 1) - self.assertIn("Tried to split and export an unsupported metric type", log.output[0]) + self.assertIn( + "Tried to split and export an unsupported metric type", + log.output[0], + ) @activate @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") @@ -953,15 +1004,12 @@ def test_preferred_aggregation_override(self): ) - def _resource_metrics( index: int, scope_metrics: List[pb2.ScopeMetrics] ) -> pb2.ResourceMetrics: return pb2.ResourceMetrics( resource={ - "attributes": [ - KeyValue(key="a", value={"int_value": index}) - ], + "attributes": [KeyValue(key="a", value={"int_value": index})], }, schema_url=f"resource_url_{index}", scope_metrics=scope_metrics, @@ -981,9 +1029,7 @@ def _gauge(index: int, data_points: List[pb2.NumberDataPoint]) -> pb2.Metric: name=f"gauge_{index}", description="description", unit="unit", - gauge=pb2.Gauge( - data_points=data_points - ), + gauge=pb2.Gauge(data_points=data_points), ) From f1ef6c404faab53a8eb9685206e8e8010a0b5f77 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Mon, 12 May 2025 15:01:37 -0700 Subject: [PATCH 4/4] HTTP metrics export batching does not retry if big failure --- .../proto/http/metric_exporter/__init__.py | 80 ++++++++------ .../metrics/test_otlp_metrics_exporter.py | 102 ++++++++++++++++++ 2 files changed, 147 insertions(+), 35 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 033b1404bbb..78c7043b994 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -237,13 +237,14 @@ def export( **kwargs, ) -> MetricExportResult: serialized_data = encode_metrics(metrics_data) - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: - return MetricExportResult.FAILURE - if self._max_export_batch_size is None: + if self._max_export_batch_size is None: + for delay in _create_exp_backoff_generator( + max_value=self._MAX_RETRY_TIMEOUT + ): + if delay == self._MAX_RETRY_TIMEOUT: + return MetricExportResult.FAILURE + resp = self._export(serialized_data.SerializeToString()) # pylint: disable=no-else-return if resp.ok: @@ -264,38 +265,47 @@ def export( ) return MetricExportResult.FAILURE - # Else, attempt export in batches for this retry - else: - export_result = MetricExportResult.SUCCESS - for split_metrics_data in self._split_metrics_data( - serialized_data - ): - split_resp = self._export( - split_metrics_data.SerializeToString() - ) + return MetricExportResult.FAILURE - if split_resp.ok: - export_result = MetricExportResult.SUCCESS - elif self._retryable(split_resp): - _logger.warning( - "Transient error %s encountered while exporting metric batch, retrying in %ss.", - split_resp.reason, - delay, - ) - sleep(delay) - continue - else: - _logger.error( - "Failed to export batch code: %s, reason: %s", - split_resp.status_code, - split_resp.text, - ) - export_result = MetricExportResult.FAILURE + # Else, attempt export in batches + split_metrics_batches = list(self._split_metrics_data(serialized_data)) + export_result = MetricExportResult.SUCCESS + + for split_metrics_data in split_metrics_batches: + # Export current batch until success, non-transient error, or timeout reached + for delay in _create_exp_backoff_generator( + max_value=self._MAX_RETRY_TIMEOUT + ): + if delay == self._MAX_RETRY_TIMEOUT: + export_result = MetricExportResult.FAILURE + break - # Return result after all batches are attempted - return export_result + split_resp = self._export( + split_metrics_data.SerializeToString() + ) + # pylint: disable=no-else-return + if split_resp.ok: + export_result = MetricExportResult.SUCCESS + break + elif self._retryable(split_resp): + _logger.warning( + "Transient error %s encountered while exporting metric batch, retrying in %ss.", + split_resp.reason, + delay, + ) + sleep(delay) + continue + else: + _logger.error( + "Failed to export batch code: %s, reason: %s", + split_resp.status_code, + split_resp.text, + ) + export_result = MetricExportResult.FAILURE + break - return MetricExportResult.FAILURE + # Return last result after all batches are attempted + return export_result def _split_metrics_data( self, diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 6751c9bd651..68fb526e801 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -831,6 +831,108 @@ def test_exponential_backoff(self, mock_sleep): [call(1), call(2), call(4), call(8), call(16), call(32)] ) + @patch.object(OTLPMetricExporter, "_export") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter._create_exp_backoff_generator" + ) + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" + ) + def test_export_retries_with_batching( + self, + mock_encode_metrics, + mock_sleep, + mock_backoff_generator, + mock_export, + ): + mock_backoff_generator.return_value = iter([1, 2, 4]) + mock_export.side_effect = [ + # Non-retryable + MagicMock(ok=False, status_code=400, reason="bad request"), + # Retryable + MagicMock( + ok=False, status_code=500, reason="internal server error" + ), + # Success + MagicMock(ok=True), + ] + mock_encode_metrics.return_value = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + batch_1 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ) + batch_2 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + + exporter = OTLPMetricExporter(max_export_batch_size=2) + result = exporter.export("foo") + self.assertEqual(result, MetricExportResult.SUCCESS) + self.assertEqual(mock_export.call_count, 3) + mock_export.assert_has_calls( + [ + call(batch_1.SerializeToString()), + call(batch_2.SerializeToString()), + call(batch_2.SerializeToString()), + ] + ) + def test_aggregation_temporality(self): otlp_metric_exporter = OTLPMetricExporter()