Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Fix explicit bucket histogram aggregation
([#3429](https://github.com/open-telemetry/opentelemetry-python/pull/3429))
- Add `code.lineno`, `code.function` and `code.filepath` to all logs
([#3645](https://github.com/open-telemetry/opentelemetry-python/pull/3645))
- Add Synchronous Gauge instrument
([#3462](https://github.com/open-telemetry/opentelemetry-python/pull/3462))
- Drop support for 3.7
Expand All @@ -19,8 +23,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3623](https://github.com/open-telemetry/opentelemetry-python/pull/3623))
- Improve Resource Detector timeout messaging
([#3645](https://github.com/open-telemetry/opentelemetry-python/pull/3645))
- Add `code.lineno`, `code.function` and `code.filepath` to all logs
([#3645](https://github.com/open-telemetry/opentelemetry-python/pull/3645))

## Version 1.22.0/0.43b0 (2023-12-15)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]):
def __init__(
self,
attributes: Attributes,
instrument_aggregation_temporality: AggregationTemporality,
start_time_unix_nano: int,
boundaries: Sequence[float] = (
0.0,
Expand All @@ -398,33 +399,43 @@ def __init__(
record_min_max: bool = True,
):
super().__init__(attributes)

self._boundaries = tuple(boundaries)
self._bucket_counts = self._get_empty_bucket_counts()
self._record_min_max = record_min_max
self._min = inf
self._max = -inf
self._sum = 0
self._record_min_max = record_min_max

self._start_time_unix_nano = start_time_unix_nano
# It is assumed that the "natural" aggregation temporality for a
# Histogram instrument is DELTA, like the "natural" aggregation
# temporality for a Counter is DELTA and the "natural" aggregation
# temporality for an ObservableCounter is CUMULATIVE.
self._instrument_aggregation_temporality = AggregationTemporality.DELTA
self._instrument_aggregation_temporality = (
instrument_aggregation_temporality
)

self._current_value = None

self._previous_collection_start_nano = self._start_time_unix_nano
self._previous_cumulative_value = self._get_empty_bucket_counts()
self._previous_min = inf
self._previous_max = -inf
self._previous_sum = 0

def _get_empty_bucket_counts(self) -> List[int]:
return [0] * (len(self._boundaries) + 1)

def aggregate(self, measurement: Measurement) -> None:
with self._lock:
if self._current_value is None:
self._current_value = self._get_empty_bucket_counts()

value = measurement.value
value = measurement.value

if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)
self._sum += value

self._sum += value
if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)

self._bucket_counts[bisect_left(self._boundaries, value)] += 1
self._current_value[bisect_left(self._boundaries, value)] += 1

def collect(
self,
Expand All @@ -434,84 +445,78 @@ def collect(
"""
Atomically return a point for the current value of the metric.
"""
with self._lock:
if not any(self._bucket_counts):
return None

bucket_counts = self._bucket_counts
start_time_unix_nano = self._start_time_unix_nano
with self._lock:
current_value = self._current_value
sum_ = self._sum
max_ = self._max
min_ = self._min
max_ = self._max

self._bucket_counts = self._get_empty_bucket_counts()
self._start_time_unix_nano = collection_start_nano
self._current_value = None
self._sum = 0
self._min = inf
self._max = -inf

current_point = HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)
if (
self._instrument_aggregation_temporality
is AggregationTemporality.DELTA
):
# This happens when the corresponding instrument for this
# aggregation is synchronous.
if (
collection_aggregation_temporality
is AggregationTemporality.DELTA
):

if self._previous_point is None or (
self._instrument_aggregation_temporality
is collection_aggregation_temporality
):
self._previous_point = current_point
return current_point
if current_value is None:
return None

max_ = current_point.max
min_ = current_point.min
previous_collection_start_nano = (
self._previous_collection_start_nano
)
self._previous_collection_start_nano = (
collection_start_nano
)

if (
collection_aggregation_temporality
is AggregationTemporality.CUMULATIVE
):
start_time_unix_nano = self._previous_point.start_time_unix_nano
sum_ = current_point.sum + self._previous_point.sum
# Only update min/max on delta -> cumulative
max_ = max(current_point.max, self._previous_point.max)
min_ = min(current_point.min, self._previous_point.min)
bucket_counts = [
curr_count + prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts,
self._previous_point.bucket_counts,
)
]
else:
start_time_unix_nano = self._previous_point.time_unix_nano
sum_ = current_point.sum - self._previous_point.sum
bucket_counts = [
curr_count - prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts,
self._previous_point.bucket_counts,
return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=previous_collection_start_nano,
time_unix_nano=collection_start_nano,
count=sum(current_value),
sum=sum_,
bucket_counts=tuple(current_value),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)

if current_value is None:
current_value = self._get_empty_bucket_counts()

self._previous_cumulative_value = [
current_value_element + previous_cumulative_value_element
for (
current_value_element,
previous_cumulative_value_element,
) in zip(current_value, self._previous_cumulative_value)
]
self._previous_min = min(min_, self._previous_min)
self._previous_max = max(max_, self._previous_max)
self._previous_sum = sum_ + self._previous_sum

return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(self._previous_cumulative_value),
sum=self._previous_sum,
bucket_counts=tuple(self._previous_cumulative_value),
explicit_bounds=self._boundaries,
min=self._previous_min,
max=self._previous_max,
)
]

current_point = HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=current_point.explicit_bounds,
min=min_,
max=max_,
)
self._previous_point = current_point
return current_point
return None


# pylint: disable=protected-access
Expand Down Expand Up @@ -1100,7 +1105,11 @@ def _create_aggregation(

if isinstance(instrument, Histogram):
return _ExplicitBucketHistogramAggregation(
attributes, start_time_unix_nano
attributes,
instrument_aggregation_temporality=(
AggregationTemporality.DELTA
),
start_time_unix_nano=start_time_unix_nano,
)

if isinstance(instrument, ObservableGauge):
Expand Down Expand Up @@ -1179,8 +1188,18 @@ def _create_aggregation(
attributes: Attributes,
start_time_unix_nano: int,
) -> _Aggregation:

instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
if isinstance(instrument, Synchronous):
instrument_aggregation_temporality = AggregationTemporality.DELTA
elif isinstance(instrument, Asynchronous):
instrument_aggregation_temporality = (
AggregationTemporality.CUMULATIVE
)

return _ExplicitBucketHistogramAggregation(
attributes,
instrument_aggregation_temporality,
start_time_unix_nano,
self._boundaries,
self._record_min_max,
Expand All @@ -1200,16 +1219,18 @@ def _create_aggregation(
start_time_unix_nano: int,
) -> _Aggregation:

temporality = AggregationTemporality.UNSPECIFIED
instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
if isinstance(instrument, Synchronous):
temporality = AggregationTemporality.DELTA
instrument_aggregation_temporality = AggregationTemporality.DELTA
elif isinstance(instrument, Asynchronous):
temporality = AggregationTemporality.CUMULATIVE
instrument_aggregation_temporality = (
AggregationTemporality.CUMULATIVE
)

return _SumAggregation(
attributes,
isinstance(instrument, (Counter, ObservableCounter)),
temporality,
instrument_aggregation_temporality,
start_time_unix_nano,
)

Expand Down
Loading