diff --git a/CHANGELOG.md b/CHANGELOG.md index bac47c6bc01..5373a8ad927 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/) and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)). +- Add configurable `max_export_batch_size` to OTLP HTTP metrics exporter + ([#4576](https://github.com/open-telemetry/opentelemetry-python/pull/4576)) ## Version 1.33.0/0.54b0 (2025-05-09) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 4feea8d4302..78c7043b994 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -22,6 +22,7 @@ Any, Callable, Dict, + Iterable, List, Mapping, Sequence, @@ -116,7 +117,29 @@ def __init__( preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, + max_export_batch_size: int | None = None, ): + """OTLP HTTP metrics exporter + + Args: + endpoint: Target URL to which the exporter is going to send metrics + certificate_file: Path to the certificate file to use for any TLS + client_key_file: Path to the client key file to use for any TLS + client_certificate_file: Path to the client certificate file to use for any TLS + headers: Headers to be sent with HTTP requests at export + timeout: Timeout in seconds for export + compression: Compression to use; one of none, gzip, deflate + session: Requests session to use at export + preferred_temporality: Map of preferred temporality for each metric type. + See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what + preferred temporality is. + preferred_aggregation: Map of preferred aggregation for each metric type. + See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what + preferred aggregation is. + max_export_batch_size: Maximum number of data points to export in a single request. + If not set there is no limit to the number of data points in a request. + If it is set and the number of data points exceeds the max, the request will be split. + """ self._endpoint = endpoint or environ.get( OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, _append_metrics_path( @@ -165,6 +188,7 @@ def __init__( self._common_configuration( preferred_temporality, preferred_aggregation ) + self._max_export_batch_size: int | None = max_export_batch_size def _export(self, serialized_data: bytes): data = serialized_data @@ -213,32 +237,463 @@ def export( **kwargs, ) -> MetricExportResult: serialized_data = encode_metrics(metrics_data) - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: - return MetricExportResult.FAILURE - - resp = self._export(serialized_data.SerializeToString()) - # pylint: disable=no-else-return - if resp.ok: - return MetricExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting metric batch, retrying in %ss.", - resp.reason, - delay, + + if self._max_export_batch_size is None: + for delay in _create_exp_backoff_generator( + max_value=self._MAX_RETRY_TIMEOUT + ): + if delay == self._MAX_RETRY_TIMEOUT: + return MetricExportResult.FAILURE + + resp = self._export(serialized_data.SerializeToString()) + # pylint: disable=no-else-return + if resp.ok: + return MetricExportResult.SUCCESS + elif self._retryable(resp): + _logger.warning( + "Transient error %s encountered while exporting metric batch, retrying in %ss.", + resp.reason, + delay, + ) + sleep(delay) + continue + else: + _logger.error( + "Failed to export batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return MetricExportResult.FAILURE + + return MetricExportResult.FAILURE + + # Else, attempt export in batches + split_metrics_batches = list(self._split_metrics_data(serialized_data)) + export_result = MetricExportResult.SUCCESS + + for split_metrics_data in split_metrics_batches: + # Export current batch until success, non-transient error, or timeout reached + for delay in _create_exp_backoff_generator( + max_value=self._MAX_RETRY_TIMEOUT + ): + if delay == self._MAX_RETRY_TIMEOUT: + export_result = MetricExportResult.FAILURE + break + + split_resp = self._export( + split_metrics_data.SerializeToString() ) - sleep(delay) - continue - else: - _logger.error( - "Failed to export batch code: %s, reason: %s", - resp.status_code, - resp.text, + # pylint: disable=no-else-return + if split_resp.ok: + export_result = MetricExportResult.SUCCESS + break + elif self._retryable(split_resp): + _logger.warning( + "Transient error %s encountered while exporting metric batch, retrying in %ss.", + split_resp.reason, + delay, + ) + sleep(delay) + continue + else: + _logger.error( + "Failed to export batch code: %s, reason: %s", + split_resp.status_code, + split_resp.text, + ) + export_result = MetricExportResult.FAILURE + break + + # Return last result after all batches are attempted + return export_result + + def _split_metrics_data( + self, + metrics_data: pb2.MetricsData, + ) -> Iterable[pb2.MetricsData]: + """Splits metrics data into several MetricsData (copies protobuf originals), + based on configured data point max export batch size. + + Args: + metrics_data: metrics object based on HTTP protocol buffer definition + + Returns: + Iterable[pb2.MetricsData]: An iterable of pb2.MetricsData objects containing + pb2.ResourceMetrics, pb2.ScopeMetrics, pb2.Metrics, and data points + """ + batch_size: int = 0 + # Stores split metrics data as editable references + # used to write batched pb2 objects for export when finalized + split_resource_metrics = [] + + for resource_metrics in metrics_data.resource_metrics: + split_scope_metrics = [] + split_resource_metrics.append( + { + "resource": resource_metrics.resource, + "schema_url": resource_metrics.schema_url, + "scope_metrics": split_scope_metrics, + } + ) + + for scope_metrics in resource_metrics.scope_metrics: + split_metrics = [] + split_scope_metrics.append( + { + "scope": scope_metrics.scope, + "schema_url": scope_metrics.schema_url, + "metrics": split_metrics, + } ) - return MetricExportResult.FAILURE - return MetricExportResult.FAILURE + + for metric in scope_metrics.metrics: + split_data_points = [] + + # protobuf specifies metrics types (e.g. Sum, Histogram) + # with different accessors for data points, etc + # We maintain these structures throughout batch calculation + current_data_points = [] + if metric.HasField("sum"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "sum": { + "aggregation_temporality": metric.sum.aggregation_temporality, + "is_monotonic": metric.sum.is_monotonic, + "data_points": split_data_points, + }, + } + ) + current_data_points = metric.sum.data_points + elif metric.HasField("histogram"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "histogram": { + "aggregation_temporality": metric.histogram.aggregation_temporality, + "data_points": split_data_points, + }, + } + ) + current_data_points = metric.histogram.data_points + elif metric.HasField("exponential_histogram"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "exponential_histogram": { + "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, + "data_points": split_data_points, + }, + } + ) + current_data_points = ( + metric.exponential_histogram.data_points + ) + elif metric.HasField("gauge"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "gauge": { + "data_points": split_data_points, + }, + } + ) + current_data_points = metric.gauge.data_points + elif metric.HasField("summary"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "summary": { + "data_points": split_data_points, + }, + } + ) + else: + _logger.warning( + "Tried to split and export an unsupported metric type. Skipping." + ) + continue + + for data_point in current_data_points: + split_data_points.append(data_point) + batch_size += 1 + + if batch_size >= self._max_export_batch_size: + yield pb2.MetricsData( + resource_metrics=self._get_split_resource_metrics_pb2( + split_resource_metrics + ) + ) + + # Reset all the reference variables with current metrics_data position + # minus yielded data_points. Need to clear data_points and keep metric + # to avoid duplicate data_point export + batch_size = 0 + split_data_points = [] + + if metric.HasField("sum"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "sum": { + "aggregation_temporality": metric.sum.aggregation_temporality, + "is_monotonic": metric.sum.is_monotonic, + "data_points": split_data_points, + }, + } + ] + elif metric.HasField("histogram"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "histogram": { + "aggregation_temporality": metric.histogram.aggregation_temporality, + "data_points": split_data_points, + }, + } + ] + elif metric.HasField("exponential_histogram"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "exponential_histogram": { + "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, + "data_points": split_data_points, + }, + } + ] + elif metric.HasField("gauge"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "gauge": { + "data_points": split_data_points, + }, + } + ] + elif metric.HasField("summary"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "summary": { + "data_points": split_data_points, + }, + } + ] + + split_scope_metrics = [ + { + "scope": scope_metrics.scope, + "schema_url": scope_metrics.schema_url, + "metrics": split_metrics, + } + ] + split_resource_metrics = [ + { + "resource": resource_metrics.resource, + "schema_url": resource_metrics.schema_url, + "scope_metrics": split_scope_metrics, + } + ] + + if not split_data_points: + # If data_points is empty remove the whole metric + split_metrics.pop() + + if not split_metrics: + # If metrics is empty remove the whole scope_metrics + split_scope_metrics.pop() + + if not split_scope_metrics: + # If scope_metrics is empty remove the whole resource_metrics + split_resource_metrics.pop() + + if batch_size > 0: + yield pb2.MetricsData( + resource_metrics=self._get_split_resource_metrics_pb2( + split_resource_metrics + ) + ) + + def _get_split_resource_metrics_pb2( + self, + split_resource_metrics: List[Dict], + ) -> List[pb2.ResourceMetrics]: + """Helper that returns a list of pb2.ResourceMetrics objects based on split_resource_metrics. + Example input: + + ```python + [ + { + "resource": , + "schema_url": "http://foo-bar", + "scope_metrics": [ + "scope": , + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "apples", + "description": "number of apples purchased", + "sum": { + "aggregation_temporality": 1, + "is_monotonic": "false", + "data_points": [ + { + start_time_unix_nano: 1000 + time_unix_nano: 1001 + exemplars { + time_unix_nano: 1002 + span_id: "foo-span" + trace_id: "foo-trace" + as_int: 5 + } + as_int: 5 + } + ] + } + }, + ], + ], + }, + ] + ``` + + Args: + split_resource_metrics: A list of dict representations of ResourceMetrics, + ScopeMetrics, Metrics, and data points. + + Returns: + List[pb2.ResourceMetrics]: A list of pb2.ResourceMetrics objects containing + pb2.ScopeMetrics, pb2.Metrics, and data points + """ + split_resource_metrics_pb = [] + for resource_metrics in split_resource_metrics: + new_resource_metrics = pb2.ResourceMetrics( + resource=resource_metrics.get("resource"), + scope_metrics=[], + schema_url=resource_metrics.get("schema_url"), + ) + for scope_metrics in resource_metrics.get("scope_metrics", []): + new_scope_metrics = pb2.ScopeMetrics( + scope=scope_metrics.get("scope"), + metrics=[], + schema_url=scope_metrics.get("schema_url"), + ) + + for metric in scope_metrics.get("metrics", []): + new_metric = None + data_points = [] + + if "sum" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + sum=pb2.Sum( + data_points=[], + aggregation_temporality=metric.get("sum").get( + "aggregation_temporality" + ), + is_monotonic=metric.get("sum").get( + "is_monotonic" + ), + ), + ) + data_points = metric.get("sum").get("data_points") + elif "histogram" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + histogram=pb2.Histogram( + data_points=[], + aggregation_temporality=metric.get( + "histogram" + ).get("aggregation_temporality"), + ), + ) + data_points = metric.get("histogram").get( + "data_points" + ) + elif "exponential_histogram" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + exponential_histogram=pb2.ExponentialHistogram( + data_points=[], + aggregation_temporality=metric.get( + "exponential_histogram" + ).get("aggregation_temporality"), + ), + ) + data_points = metric.get("exponential_histogram").get( + "data_points" + ) + elif "gauge" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + gauge=pb2.Gauge( + data_points=[], + ), + ) + data_points = metric.get("gauge").get("data_points") + elif "summary" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + summary=pb2.Summary( + data_points=[], + ), + ) + data_points = metric.get("summary").get("data_points") + else: + _logger.warning( + "Tried to split and export an unsupported metric type. Skipping." + ) + continue + + for data_point in data_points: + if "sum" in metric: + new_metric.sum.data_points.append(data_point) + elif "histogram" in metric: + new_metric.histogram.data_points.append(data_point) + elif "exponential_histogram" in metric: + new_metric.exponential_histogram.data_points.append( + data_point + ) + elif "gauge" in metric: + new_metric.gauge.data_points.append(data_point) + elif "summary" in metric: + new_metric.summary.data_points.append(data_point) + + new_scope_metrics.metrics.append(new_metric) + new_resource_metrics.scope_metrics.append(new_scope_metrics) + split_resource_metrics_pb.append(new_resource_metrics) + return split_resource_metrics_pb def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 16bb3e54286..68fb526e801 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=too-many-lines from logging import WARNING from os import environ +from typing import List from unittest import TestCase from unittest.mock import MagicMock, Mock, call, patch @@ -33,6 +35,14 @@ OTLPMetricExporter, ) from opentelemetry.exporter.otlp.proto.http.version import __version__ +from opentelemetry.proto.common.v1.common_pb2 import ( + InstrumentationScope, + KeyValue, +) +from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as Pb2Resource, +) from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, @@ -86,6 +96,7 @@ # pylint: disable=protected-access class TestOTLPMetricExporter(TestCase): + # pylint: disable=too-many-public-methods def setUp(self): self.metrics = { "sum_int": MetricsData( @@ -331,6 +342,474 @@ def test_serialization(self, mock_post): cert=exporter._client_cert, ) + def test_split_metrics_data_many_data_points(self): + metrics_data = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + OTLPMetricExporter(max_export_batch_size=2)._split_metrics_data( + metrics_data=metrics_data, + ) + ) + + self.assertEqual( + [ + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ), + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ), + ], + split_metrics_data, + ) + + def test_split_metrics_data_nb_data_points_equal_batch_size(self): + metrics_data = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + OTLPMetricExporter(max_export_batch_size=3)._split_metrics_data( + metrics_data=metrics_data, + ) + ) + + self.assertEqual( + [ + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ), + ], + split_metrics_data, + ) + + def test_split_metrics_data_many_resources_scopes_metrics(self): + # GIVEN + metrics_data = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + ], + ), + _gauge( + index=2, + data_points=[ + _number_data_point(12), + ], + ), + ], + ), + _scope_metrics( + index=2, + metrics=[ + _gauge( + index=3, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + _resource_metrics( + index=2, + scope_metrics=[ + _scope_metrics( + index=3, + metrics=[ + _gauge( + index=4, + data_points=[ + _number_data_point(14), + ], + ), + ], + ), + ], + ), + ] + ) + + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + OTLPMetricExporter(max_export_batch_size=2)._split_metrics_data( + metrics_data=metrics_data, + ) + ) + + self.assertEqual( + [ + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + ], + ), + _gauge( + index=2, + data_points=[ + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ), + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=2, + metrics=[ + _gauge( + index=3, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + _resource_metrics( + index=2, + scope_metrics=[ + _scope_metrics( + index=3, + metrics=[ + _gauge( + index=4, + data_points=[ + _number_data_point(14), + ], + ), + ], + ), + ], + ), + ] + ), + ], + split_metrics_data, + ) + + def test_get_split_resource_metrics_pb2_one_of_each(self): + split_resource_metrics = [ + { + "resource": Pb2Resource( + attributes=[ + KeyValue(key="foo", value={"string_value": "bar"}) + ], + ), + "schema_url": "http://foo-bar", + "scope_metrics": [ + { + "scope": InstrumentationScope( + name="foo-scope", version="1.0.0" + ), + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "foo-metric", + "description": "foo-description", + "unit": "foo-unit", + "sum": { + "aggregation_temporality": 1, + "is_monotonic": True, + "data_points": [ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="dp_key", + value={ + "string_value": "dp_value" + }, + ) + ], + start_time_unix_nano=12345, + time_unix_nano=12350, + as_double=42.42, + ) + ], + }, + } + ], + } + ], + } + ] + + result = OTLPMetricExporter()._get_split_resource_metrics_pb2( + split_resource_metrics + ) + self.assertEqual(len(result), 1) + self.assertIsInstance(result[0], pb2.ResourceMetrics) + self.assertEqual(result[0].schema_url, "http://foo-bar") + self.assertEqual(len(result[0].scope_metrics), 1) + self.assertEqual(result[0].scope_metrics[0].scope.name, "foo-scope") + self.assertEqual(len(result[0].scope_metrics[0].metrics), 1) + self.assertEqual( + result[0].scope_metrics[0].metrics[0].name, "foo-metric" + ) + self.assertEqual( + result[0].scope_metrics[0].metrics[0].sum.is_monotonic, True + ) + + def test_get_split_resource_metrics_pb2_multiples(self): + split_resource_metrics = [ + { + "resource": Pb2Resource( + attributes=[ + KeyValue(key="foo1", value={"string_value": "bar2"}) + ], + ), + "schema_url": "http://foo-bar-1", + "scope_metrics": [ + { + "scope": InstrumentationScope( + name="foo-scope-1", version="1.0.0" + ), + "schema_url": "http://foo-baz-1", + "metrics": [ + { + "name": "foo-metric-1", + "description": "foo-description-1", + "unit": "foo-unit-1", + "gauge": { + "data_points": [ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="dp_key", + value={ + "string_value": "dp_value" + }, + ) + ], + start_time_unix_nano=12345, + time_unix_nano=12350, + as_double=42.42, + ) + ], + }, + } + ], + } + ], + }, + { + "resource": Pb2Resource( + attributes=[ + KeyValue(key="foo2", value={"string_value": "bar2"}) + ], + ), + "schema_url": "http://foo-bar-2", + "scope_metrics": [ + { + "scope": InstrumentationScope( + name="foo-scope-2", version="2.0.0" + ), + "schema_url": "http://foo-baz-2", + "metrics": [ + { + "name": "foo-metric-2", + "description": "foo-description-2", + "unit": "foo-unit-2", + "histogram": { + "aggregation_temporality": 2, + "data_points": [ + pb2.HistogramDataPoint( + attributes=[ + KeyValue( + key="dp_key", + value={ + "string_value": "dp_value" + }, + ) + ], + start_time_unix_nano=12345, + time_unix_nano=12350, + ) + ], + }, + } + ], + } + ], + }, + ] + + result = OTLPMetricExporter()._get_split_resource_metrics_pb2( + split_resource_metrics + ) + self.assertEqual(len(result), 2) + self.assertEqual(result[0].schema_url, "http://foo-bar-1") + self.assertEqual(result[1].schema_url, "http://foo-bar-2") + self.assertEqual(len(result[0].scope_metrics), 1) + self.assertEqual(len(result[1].scope_metrics), 1) + self.assertEqual(result[0].scope_metrics[0].scope.name, "foo-scope-1") + self.assertEqual(result[1].scope_metrics[0].scope.name, "foo-scope-2") + self.assertEqual( + result[0].scope_metrics[0].metrics[0].name, "foo-metric-1" + ) + self.assertEqual( + result[1].scope_metrics[0].metrics[0].name, "foo-metric-2" + ) + + def test_get_split_resource_metrics_pb2_unsupported_metric_type(self): + split_resource_metrics = [ + { + "resource": Pb2Resource( + attributes=[ + KeyValue(key="foo", value={"string_value": "bar"}) + ], + ), + "schema_url": "http://foo-bar", + "scope_metrics": [ + { + "scope": InstrumentationScope( + name="foo", version="1.0.0" + ), + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "unsupported-metric", + "description": "foo-bar", + "unit": "foo-bar", + "unsupported_metric_type": {}, + } + ], + } + ], + } + ] + + with self.assertLogs(level="WARNING") as log: + result = OTLPMetricExporter()._get_split_resource_metrics_pb2( + split_resource_metrics + ) + self.assertEqual(len(result), 1) + self.assertIn( + "Tried to split and export an unsupported metric type", + log.output[0], + ) + @activate @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") def test_exponential_backoff(self, mock_sleep): @@ -352,6 +831,108 @@ def test_exponential_backoff(self, mock_sleep): [call(1), call(2), call(4), call(8), call(16), call(32)] ) + @patch.object(OTLPMetricExporter, "_export") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter._create_exp_backoff_generator" + ) + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" + ) + def test_export_retries_with_batching( + self, + mock_encode_metrics, + mock_sleep, + mock_backoff_generator, + mock_export, + ): + mock_backoff_generator.return_value = iter([1, 2, 4]) + mock_export.side_effect = [ + # Non-retryable + MagicMock(ok=False, status_code=400, reason="bad request"), + # Retryable + MagicMock( + ok=False, status_code=500, reason="internal server error" + ), + # Success + MagicMock(ok=True), + ] + mock_encode_metrics.return_value = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + batch_1 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ) + batch_2 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + + exporter = OTLPMetricExporter(max_export_batch_size=2) + result = exporter.export("foo") + self.assertEqual(result, MetricExportResult.SUCCESS) + self.assertEqual(mock_export.call_count, 3) + mock_export.assert_has_calls( + [ + call(batch_1.SerializeToString()), + call(batch_2.SerializeToString()), + call(batch_2.SerializeToString()), + ] + ) + def test_aggregation_temporality(self): otlp_metric_exporter = OTLPMetricExporter() @@ -523,3 +1104,44 @@ def test_preferred_aggregation_override(self): self.assertEqual( exporter._preferred_aggregation[Histogram], histogram_aggregation ) + + +def _resource_metrics( + index: int, scope_metrics: List[pb2.ScopeMetrics] +) -> pb2.ResourceMetrics: + return pb2.ResourceMetrics( + resource={ + "attributes": [KeyValue(key="a", value={"int_value": index})], + }, + schema_url=f"resource_url_{index}", + scope_metrics=scope_metrics, + ) + + +def _scope_metrics(index: int, metrics: List[pb2.Metric]) -> pb2.ScopeMetrics: + return pb2.ScopeMetrics( + scope=InstrumentationScope(name=f"scope_{index}"), + schema_url=f"scope_url_{index}", + metrics=metrics, + ) + + +def _gauge(index: int, data_points: List[pb2.NumberDataPoint]) -> pb2.Metric: + return pb2.Metric( + name=f"gauge_{index}", + description="description", + unit="unit", + gauge=pb2.Gauge(data_points=data_points), + ) + + +def _number_data_point(value: int) -> pb2.NumberDataPoint: + return pb2.NumberDataPoint( + attributes=[ + KeyValue(key="a", value={"int_value": 1}), + KeyValue(key="b", value={"bool_value": True}), + ], + start_time_unix_nano=1641946015139533244, + time_unix_nano=1641946016139533244, + as_int=value, + )