Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
Sum,
ExponentialHistogram as ExponentialHistogramType,
)
from typing import Dict
from typing import Dict, Optional
from opentelemetry.proto.resource.v1.resource_pb2 import (
Resource as PB2Resource,
)
Expand Down Expand Up @@ -154,7 +154,7 @@ def _common_configuration(
)


def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
def encode_metrics(data: MetricsData) -> Optional[ExportMetricsServiceRequest]:
resource_metrics_dict = {}

for resource_metrics in data.resource_metrics:
Expand All @@ -165,8 +165,6 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
# associated with an unique resource.
scope_metrics_dict = {}

resource_metrics_dict[resource] = scope_metrics_dict

for scope_metrics in resource_metrics.scope_metrics:

instrumentation_scope = scope_metrics.scope
Expand All @@ -181,9 +179,10 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
)
)

scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics

for metric in scope_metrics.metrics:
if len(metric.data.data_points) == 0:
continue

pb2_metric = pb2.Metric(
name=metric.name,
description=metric.description,
Expand Down Expand Up @@ -301,6 +300,16 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:

pb2_scope_metrics.metrics.append(pb2_metric)

if len(pb2_scope_metrics.metrics) > 0:
# It's possible that all the metrics have zero
# datapoints. If so, we'll have no metrics at all to
# send.
scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics

if len(scope_metrics_dict) > 0:
# Analogous to the above, it's possible that all scopes are empty.
resource_metrics_dict[resource] = scope_metrics_dict

resource_data = []
for (
sdk_resource,
Expand All @@ -315,4 +324,6 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
)
)
resource_metrics = resource_data
if len(resource_metrics) == 0:
return None
return ExportMetricsServiceRequest(resource_metrics=resource_metrics)
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def export(
"""Exports a batch of telemetry data.

Args:
metrics: The list of `opentelemetry.sdk.metrics.export.Metric` objects to be exported
metrics: The `opentelemetry.sdk.metrics.export.MetricsData` object to be exported

Returns:
The result of the export
Expand Down Expand Up @@ -217,7 +217,7 @@ def __init__(
"opentelemetry.sdk.metrics.export.MetricReader",
AggregationTemporality,
],
Iterable["opentelemetry.sdk.metrics.export.Metric"],
Optional["opentelemetry.sdk.metrics.export.MetricsData"],
] = None

self._instrument_class_temporality = {
Expand Down Expand Up @@ -306,7 +306,8 @@ def __init__(
@final
def collect(self, timeout_millis: float = 10_000) -> None:
"""Collects the metrics from the internal SDK state and
invokes the `_receive_metrics` with the collection.
invokes the `_receive_metrics` with the collection if it
contains any data.

Args:
timeout_millis: Amount of time in milliseconds before this function
Expand Down Expand Up @@ -335,7 +336,7 @@ def _set_collect_callback(
"opentelemetry.sdk.metrics.export.MetricReader",
AggregationTemporality,
],
Iterable["opentelemetry.sdk.metrics.export.Metric"],
Optional["opentelemetry.sdk.metrics.export.MetricsData"],
],
) -> None:
"""This function is internal to the SDK. It should not be called or overridden by users"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from abc import ABC, abstractmethod
from threading import Lock
from time import time_ns
from typing import Iterable, List, Mapping
from typing import Iterable, List, Mapping, Optional

# This kind of import is needed to avoid Sphinx errors.
import opentelemetry.sdk.metrics
Expand All @@ -29,7 +29,7 @@
from opentelemetry.sdk.metrics._internal.metric_reader_storage import (
MetricReaderStorage,
)
from opentelemetry.sdk.metrics._internal.point import Metric
from opentelemetry.sdk.metrics._internal.point import MetricsData


class MeasurementConsumer(ABC):
Expand All @@ -51,7 +51,7 @@ def collect(
self,
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
timeout_millis: float = 10_000,
) -> Iterable[Metric]:
) -> Optional[MetricsData]:
pass


Expand Down Expand Up @@ -94,7 +94,7 @@ def collect(
self,
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
timeout_millis: float = 10_000,
) -> Iterable[Metric]:
) -> Optional[MetricsData]:

with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from logging import getLogger
from threading import RLock
from time import time_ns
from typing import Dict, List
from typing import Dict, List, Optional

from opentelemetry.metrics import (
Asynchronous,
Expand Down Expand Up @@ -119,7 +119,7 @@ def consume_measurement(self, measurement: Measurement) -> None:
):
view_instrument_match.consume_measurement(measurement)

def collect(self) -> MetricsData:
def collect(self) -> Optional[MetricsData]:
# Use a list instead of yielding to prevent a slow reader from holding
# SDK locks

Expand Down Expand Up @@ -206,15 +206,19 @@ def collect(self) -> MetricsData:
aggregation_temporality=aggregation_temporality,
)

metrics.append(
Metric(
# pylint: disable=protected-access
name=view_instrument_match._name,
description=view_instrument_match._description,
unit=view_instrument_match._instrument.unit,
data=data,
if len(data.data_points) > 0:
metrics.append(
Metric(
# pylint: disable=protected-access
name=view_instrument_match._name,
description=view_instrument_match._description,
unit=view_instrument_match._instrument.unit,
data=data,
)
)
)

if len(metrics) == 0:
continue

if instrument.instrumentation_scope not in (
instrumentation_scope_scope_metrics
Expand All @@ -231,13 +235,15 @@ def collect(self) -> MetricsData:
instrument.instrumentation_scope
].metrics.extend(metrics)

scope_metrics = list(instrumentation_scope_scope_metrics.values())
if len(scope_metrics) == 0:
return None

return MetricsData(
resource_metrics=[
ResourceMetrics(
resource=self._sdk_config.resource,
scope_metrics=list(
instrumentation_scope_scope_metrics.values()
),
scope_metrics=scope_metrics,
schema_url=self._sdk_config.resource.schema_url,
)
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,7 @@ def test_disable_default_views(self):
counter.add(10, {"label": "value2"})
counter.add(10, {"label": "value3"})
self.assertEqual(
(
reader.get_metrics_data()
.resource_metrics[0]
.scope_metrics[0]
.metrics
),
[],
None, reader.get_metrics_data()
)

def test_disable_default_views_add_custom(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,7 @@ def test_drop_aggregation(self):
)
metric_reader_storage.consume_measurement(Measurement(1, counter))

self.assertEqual(
[],
(
metric_reader_storage.collect()
.resource_metrics[0]
.scope_metrics[0]
.metrics
),
)
self.assertEqual(None, (metric_reader_storage.collect()),)

def test_same_collection_start(self):

Expand Down