Skip to content

Commit 941e355

Browse files
authored
Fix explicit bucket histogram aggregation (#3429)
* Fix explicit bucket histogram aggregation Fixes #3407 * Revert "Fix explicit bucket histogram aggregation" This reverts commit f1c6683. * Fix ExplicitBucketHistogramAggregation Fixes #3407 * Fix default instrument temporality * Fix last test case * Add more test cases * Test min, max and sum * Fix lint * Add CHANGELOG * Fix lint * Skip test if running in Windows
1 parent d5fdcd6 commit 941e355

File tree

5 files changed

+386
-97
lines changed

5 files changed

+386
-97
lines changed

CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
- Fix explicit bucket histogram aggregation
11+
([#3429](https://github.com/open-telemetry/opentelemetry-python/pull/3429))
12+
- Add `code.lineno`, `code.function` and `code.filepath` to all logs
13+
([#3645](https://github.com/open-telemetry/opentelemetry-python/pull/3645))
1014
- Add Synchronous Gauge instrument
1115
([#3462](https://github.com/open-telemetry/opentelemetry-python/pull/3462))
1216
- Drop support for 3.7
@@ -19,8 +23,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1923
([#3623](https://github.com/open-telemetry/opentelemetry-python/pull/3623))
2024
- Improve Resource Detector timeout messaging
2125
([#3645](https://github.com/open-telemetry/opentelemetry-python/pull/3645))
22-
- Add `code.lineno`, `code.function` and `code.filepath` to all logs
23-
([#3645](https://github.com/open-telemetry/opentelemetry-python/pull/3645))
2426

2527
## Version 1.22.0/0.43b0 (2023-12-15)
2628

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py

Lines changed: 104 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]):
377377
def __init__(
378378
self,
379379
attributes: Attributes,
380+
instrument_aggregation_temporality: AggregationTemporality,
380381
start_time_unix_nano: int,
381382
boundaries: Sequence[float] = (
382383
0.0,
@@ -398,33 +399,43 @@ def __init__(
398399
record_min_max: bool = True,
399400
):
400401
super().__init__(attributes)
402+
401403
self._boundaries = tuple(boundaries)
402-
self._bucket_counts = self._get_empty_bucket_counts()
404+
self._record_min_max = record_min_max
403405
self._min = inf
404406
self._max = -inf
405407
self._sum = 0
406-
self._record_min_max = record_min_max
408+
407409
self._start_time_unix_nano = start_time_unix_nano
408-
# It is assumed that the "natural" aggregation temporality for a
409-
# Histogram instrument is DELTA, like the "natural" aggregation
410-
# temporality for a Counter is DELTA and the "natural" aggregation
411-
# temporality for an ObservableCounter is CUMULATIVE.
412-
self._instrument_aggregation_temporality = AggregationTemporality.DELTA
410+
self._instrument_aggregation_temporality = (
411+
instrument_aggregation_temporality
412+
)
413+
414+
self._current_value = None
415+
416+
self._previous_collection_start_nano = self._start_time_unix_nano
417+
self._previous_cumulative_value = self._get_empty_bucket_counts()
418+
self._previous_min = inf
419+
self._previous_max = -inf
420+
self._previous_sum = 0
413421

414422
def _get_empty_bucket_counts(self) -> List[int]:
415423
return [0] * (len(self._boundaries) + 1)
416424

417425
def aggregate(self, measurement: Measurement) -> None:
426+
with self._lock:
427+
if self._current_value is None:
428+
self._current_value = self._get_empty_bucket_counts()
418429

419-
value = measurement.value
430+
value = measurement.value
420431

421-
if self._record_min_max:
422-
self._min = min(self._min, value)
423-
self._max = max(self._max, value)
432+
self._sum += value
424433

425-
self._sum += value
434+
if self._record_min_max:
435+
self._min = min(self._min, value)
436+
self._max = max(self._max, value)
426437

427-
self._bucket_counts[bisect_left(self._boundaries, value)] += 1
438+
self._current_value[bisect_left(self._boundaries, value)] += 1
428439

429440
def collect(
430441
self,
@@ -434,84 +445,78 @@ def collect(
434445
"""
435446
Atomically return a point for the current value of the metric.
436447
"""
437-
with self._lock:
438-
if not any(self._bucket_counts):
439-
return None
440448

441-
bucket_counts = self._bucket_counts
442-
start_time_unix_nano = self._start_time_unix_nano
449+
with self._lock:
450+
current_value = self._current_value
443451
sum_ = self._sum
444-
max_ = self._max
445452
min_ = self._min
453+
max_ = self._max
446454

447-
self._bucket_counts = self._get_empty_bucket_counts()
448-
self._start_time_unix_nano = collection_start_nano
455+
self._current_value = None
449456
self._sum = 0
450457
self._min = inf
451458
self._max = -inf
452459

453-
current_point = HistogramDataPoint(
454-
attributes=self._attributes,
455-
start_time_unix_nano=start_time_unix_nano,
456-
time_unix_nano=collection_start_nano,
457-
count=sum(bucket_counts),
458-
sum=sum_,
459-
bucket_counts=tuple(bucket_counts),
460-
explicit_bounds=self._boundaries,
461-
min=min_,
462-
max=max_,
463-
)
460+
if (
461+
self._instrument_aggregation_temporality
462+
is AggregationTemporality.DELTA
463+
):
464+
# This happens when the corresponding instrument for this
465+
# aggregation is synchronous.
466+
if (
467+
collection_aggregation_temporality
468+
is AggregationTemporality.DELTA
469+
):
464470

465-
if self._previous_point is None or (
466-
self._instrument_aggregation_temporality
467-
is collection_aggregation_temporality
468-
):
469-
self._previous_point = current_point
470-
return current_point
471+
if current_value is None:
472+
return None
471473

472-
max_ = current_point.max
473-
min_ = current_point.min
474+
previous_collection_start_nano = (
475+
self._previous_collection_start_nano
476+
)
477+
self._previous_collection_start_nano = (
478+
collection_start_nano
479+
)
474480

475-
if (
476-
collection_aggregation_temporality
477-
is AggregationTemporality.CUMULATIVE
478-
):
479-
start_time_unix_nano = self._previous_point.start_time_unix_nano
480-
sum_ = current_point.sum + self._previous_point.sum
481-
# Only update min/max on delta -> cumulative
482-
max_ = max(current_point.max, self._previous_point.max)
483-
min_ = min(current_point.min, self._previous_point.min)
484-
bucket_counts = [
485-
curr_count + prev_count
486-
for curr_count, prev_count in zip(
487-
current_point.bucket_counts,
488-
self._previous_point.bucket_counts,
489-
)
490-
]
491-
else:
492-
start_time_unix_nano = self._previous_point.time_unix_nano
493-
sum_ = current_point.sum - self._previous_point.sum
494-
bucket_counts = [
495-
curr_count - prev_count
496-
for curr_count, prev_count in zip(
497-
current_point.bucket_counts,
498-
self._previous_point.bucket_counts,
481+
return HistogramDataPoint(
482+
attributes=self._attributes,
483+
start_time_unix_nano=previous_collection_start_nano,
484+
time_unix_nano=collection_start_nano,
485+
count=sum(current_value),
486+
sum=sum_,
487+
bucket_counts=tuple(current_value),
488+
explicit_bounds=self._boundaries,
489+
min=min_,
490+
max=max_,
491+
)
492+
493+
if current_value is None:
494+
current_value = self._get_empty_bucket_counts()
495+
496+
self._previous_cumulative_value = [
497+
current_value_element + previous_cumulative_value_element
498+
for (
499+
current_value_element,
500+
previous_cumulative_value_element,
501+
) in zip(current_value, self._previous_cumulative_value)
502+
]
503+
self._previous_min = min(min_, self._previous_min)
504+
self._previous_max = max(max_, self._previous_max)
505+
self._previous_sum = sum_ + self._previous_sum
506+
507+
return HistogramDataPoint(
508+
attributes=self._attributes,
509+
start_time_unix_nano=self._start_time_unix_nano,
510+
time_unix_nano=collection_start_nano,
511+
count=sum(self._previous_cumulative_value),
512+
sum=self._previous_sum,
513+
bucket_counts=tuple(self._previous_cumulative_value),
514+
explicit_bounds=self._boundaries,
515+
min=self._previous_min,
516+
max=self._previous_max,
499517
)
500-
]
501518

502-
current_point = HistogramDataPoint(
503-
attributes=self._attributes,
504-
start_time_unix_nano=start_time_unix_nano,
505-
time_unix_nano=current_point.time_unix_nano,
506-
count=sum(bucket_counts),
507-
sum=sum_,
508-
bucket_counts=tuple(bucket_counts),
509-
explicit_bounds=current_point.explicit_bounds,
510-
min=min_,
511-
max=max_,
512-
)
513-
self._previous_point = current_point
514-
return current_point
519+
return None
515520

516521

517522
# pylint: disable=protected-access
@@ -1100,7 +1105,11 @@ def _create_aggregation(
11001105

11011106
if isinstance(instrument, Histogram):
11021107
return _ExplicitBucketHistogramAggregation(
1103-
attributes, start_time_unix_nano
1108+
attributes,
1109+
instrument_aggregation_temporality=(
1110+
AggregationTemporality.DELTA
1111+
),
1112+
start_time_unix_nano=start_time_unix_nano,
11041113
)
11051114

11061115
if isinstance(instrument, ObservableGauge):
@@ -1179,8 +1188,18 @@ def _create_aggregation(
11791188
attributes: Attributes,
11801189
start_time_unix_nano: int,
11811190
) -> _Aggregation:
1191+
1192+
instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
1193+
if isinstance(instrument, Synchronous):
1194+
instrument_aggregation_temporality = AggregationTemporality.DELTA
1195+
elif isinstance(instrument, Asynchronous):
1196+
instrument_aggregation_temporality = (
1197+
AggregationTemporality.CUMULATIVE
1198+
)
1199+
11821200
return _ExplicitBucketHistogramAggregation(
11831201
attributes,
1202+
instrument_aggregation_temporality,
11841203
start_time_unix_nano,
11851204
self._boundaries,
11861205
self._record_min_max,
@@ -1200,16 +1219,18 @@ def _create_aggregation(
12001219
start_time_unix_nano: int,
12011220
) -> _Aggregation:
12021221

1203-
temporality = AggregationTemporality.UNSPECIFIED
1222+
instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
12041223
if isinstance(instrument, Synchronous):
1205-
temporality = AggregationTemporality.DELTA
1224+
instrument_aggregation_temporality = AggregationTemporality.DELTA
12061225
elif isinstance(instrument, Asynchronous):
1207-
temporality = AggregationTemporality.CUMULATIVE
1226+
instrument_aggregation_temporality = (
1227+
AggregationTemporality.CUMULATIVE
1228+
)
12081229

12091230
return _SumAggregation(
12101231
attributes,
12111232
isinstance(instrument, (Counter, ObservableCounter)),
1212-
temporality,
1233+
instrument_aggregation_temporality,
12131234
start_time_unix_nano,
12141235
)
12151236

0 commit comments

Comments
 (0)