Skip to content

Commit f1c6683

Browse files
committed
Fix explicit bucket histogram aggregation
Fixes open-telemetry#3407
1 parent 1625b35 commit f1c6683

File tree

4 files changed

+313
-44
lines changed

4 files changed

+313
-44
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -397,8 +397,12 @@ def __init__(
397397
record_min_max: bool = True,
398398
):
399399
super().__init__(attributes)
400+
401+
self._start_time_unix_nano = start_time_unix_nano
400402
self._boundaries = tuple(boundaries)
401-
self._bucket_counts = self._get_empty_bucket_counts()
403+
self._record_min_max = record_min_max
404+
405+
self._current_value = None
402406
self._min = inf
403407
self._max = -inf
404408
self._sum = 0
@@ -415,15 +419,19 @@ def _get_empty_bucket_counts(self) -> List[int]:
415419

416420
def aggregate(self, measurement: Measurement) -> None:
417421

418-
value = measurement.value
422+
with self._lock:
419423

420-
if self._record_min_max:
421-
self._min = min(self._min, value)
422-
self._max = max(self._max, value)
424+
if self._current_value is None:
425+
self._current_value = self._get_empty_bucket_counts()
423426

424-
self._sum += value
427+
value = measurement.value
425428

426-
self._bucket_counts[bisect_left(self._boundaries, value)] += 1
429+
if self._record_min_max:
430+
self._min = min(self._min, value)
431+
self._max = max(self._max, value)
432+
433+
self._current_value[bisect_left(self._boundaries, value)] += 1
434+
self._sum = self._sum + value
427435

428436
def collect(
429437
self,
@@ -434,14 +442,8 @@ def collect(
434442
Atomically return a point for the current value of the metric.
435443
"""
436444
with self._lock:
437-
if not any(self._bucket_counts):
438-
return None
439445

440-
bucket_counts = self._bucket_counts
441-
start_time_unix_nano = self._start_time_unix_nano
442-
sum_ = self._sum
443-
max_ = self._max
444-
min_ = self._min
446+
if aggregation_temporality is AggregationTemporality.DELTA:
445447

446448
self._bucket_counts = self._get_empty_bucket_counts()
447449
self._start_time_unix_nano = collection_start_nano
@@ -486,31 +488,39 @@ def collect(
486488
current_point.bucket_counts,
487489
self._previous_point.bucket_counts,
488490
)
489-
]
490-
else:
491-
start_time_unix_nano = self._previous_point.time_unix_nano
492-
sum_ = current_point.sum - self._previous_point.sum
493-
bucket_counts = [
494-
curr_count - prev_count
495-
for curr_count, prev_count in zip(
496-
current_point.bucket_counts,
497-
self._previous_point.bucket_counts,
491+
492+
self._current_value = None
493+
self._min = inf
494+
self._max = -inf
495+
self._sum = 0
496+
self._previous_collection_start_nano = collection_start_nano
497+
498+
if current_value is None:
499+
return None
500+
501+
return HistogramDataPoint(
502+
attributes=self._attributes,
503+
start_time_unix_nano=previous_collection_start_nano,
504+
time_unix_nano=collection_start_nano,
505+
count=sum(current_value),
506+
sum=sum_,
507+
bucket_counts=tuple(current_value),
508+
explicit_bounds=self._boundaries,
509+
min=min_,
510+
max=max_,
498511
)
499-
]
500512

501-
current_point = HistogramDataPoint(
502-
attributes=self._attributes,
503-
start_time_unix_nano=start_time_unix_nano,
504-
time_unix_nano=current_point.time_unix_nano,
505-
count=sum(bucket_counts),
506-
sum=sum_,
507-
bucket_counts=tuple(bucket_counts),
508-
explicit_bounds=current_point.explicit_bounds,
509-
min=min_,
510-
max=max_,
511-
)
512-
self._previous_point = current_point
513-
return current_point
513+
return HistogramDataPoint(
514+
attributes=self._attributes,
515+
start_time_unix_nano=self._start_time_unix_nano,
516+
time_unix_nano=collection_start_nano,
517+
count=sum(self._current_value),
518+
sum=self._sum,
519+
bucket_counts=tuple(self._current_value),
520+
explicit_bounds=self._boundaries,
521+
min=self._min,
522+
max=self._max,
523+
)
514524

515525

516526
# pylint: disable=protected-access
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from platform import system
16+
from unittest import TestCase
17+
18+
from pytest import mark
19+
20+
from opentelemetry.sdk.metrics import Histogram, MeterProvider
21+
from opentelemetry.sdk.metrics.export import (
22+
AggregationTemporality,
23+
InMemoryMetricReader,
24+
)
25+
from opentelemetry.sdk.metrics.view import ExplicitBucketHistogramAggregation
26+
27+
28+
class TestExplicitBucketHistogramAggregation(TestCase):
29+
30+
values = [
31+
1.0,
32+
6.0,
33+
11.0,
34+
26.0,
35+
51.0,
36+
76.0,
37+
101.0,
38+
251.0,
39+
501.0,
40+
751.0,
41+
]
42+
43+
@mark.skipif(
44+
system() != "Linux",
45+
reason=(
46+
"Tests fail because Windows time_ns resolution is too low so "
47+
"two different time measurements may end up having the exact same"
48+
"value."
49+
),
50+
)
51+
def test_synchronous_delta_temporality(self):
52+
53+
aggregation = ExplicitBucketHistogramAggregation()
54+
55+
reader = InMemoryMetricReader(
56+
preferred_aggregation={Histogram: aggregation},
57+
preferred_temporality={Histogram: AggregationTemporality.DELTA},
58+
)
59+
60+
provider = MeterProvider(metric_readers=[reader])
61+
meter = provider.get_meter("name", "version")
62+
63+
histogram = meter.create_histogram("histogram")
64+
65+
results = []
66+
67+
for _ in range(10):
68+
69+
results.append(reader.get_metrics_data())
70+
71+
for metrics_data in results:
72+
self.assertIsNone(metrics_data)
73+
74+
results = []
75+
76+
for value in self.values:
77+
histogram.record(value)
78+
results.append(reader.get_metrics_data())
79+
80+
previous_time_unix_nano = (
81+
results[0]
82+
.resource_metrics[0]
83+
.scope_metrics[0]
84+
.metrics[0]
85+
.data.data_points[0]
86+
.time_unix_nano
87+
)
88+
89+
self.assertEqual(
90+
(
91+
results[0]
92+
.resource_metrics[0]
93+
.scope_metrics[0]
94+
.metrics[0]
95+
.data.data_points[0]
96+
.bucket_counts
97+
),
98+
(0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
99+
)
100+
101+
self.assertLess(
102+
(
103+
results[0]
104+
.resource_metrics[0]
105+
.scope_metrics[0]
106+
.metrics[0]
107+
.data.data_points[0]
108+
.start_time_unix_nano
109+
),
110+
previous_time_unix_nano,
111+
)
112+
113+
for index, metrics_data in enumerate(results[1:]):
114+
115+
metric_data = (
116+
metrics_data.resource_metrics[0]
117+
.scope_metrics[0]
118+
.metrics[0]
119+
.data.data_points[0]
120+
)
121+
122+
self.assertEqual(
123+
previous_time_unix_nano, metric_data.start_time_unix_nano
124+
)
125+
previous_time_unix_nano = metric_data.time_unix_nano
126+
127+
self.assertEqual(
128+
metric_data.bucket_counts,
129+
tuple([1 if value == index + 2 else 0 for value in range(16)]),
130+
)
131+
self.assertLess(
132+
metric_data.start_time_unix_nano, metric_data.time_unix_nano
133+
)
134+
135+
results = []
136+
137+
for _ in range(10):
138+
139+
results.append(reader.get_metrics_data())
140+
141+
provider.shutdown()
142+
143+
for metrics_data in results:
144+
self.assertIsNone(metrics_data)
145+
146+
@mark.skipif(
147+
system() != "Linux",
148+
reason=(
149+
"Tests fail because Windows time_ns resolution is too low so "
150+
"two different time measurements may end up having the exact same"
151+
"value."
152+
),
153+
)
154+
def test_synchronous_cumulative_temporality(self):
155+
156+
aggregation = ExplicitBucketHistogramAggregation()
157+
158+
reader = InMemoryMetricReader(
159+
preferred_aggregation={Histogram: aggregation},
160+
preferred_temporality={
161+
Histogram: AggregationTemporality.CUMULATIVE
162+
},
163+
)
164+
165+
provider = MeterProvider(metric_readers=[reader])
166+
meter = provider.get_meter("name", "version")
167+
168+
histogram = meter.create_histogram("histogram")
169+
170+
results = []
171+
172+
for _ in range(10):
173+
174+
results.append(reader.get_metrics_data())
175+
176+
for metrics_data in results:
177+
self.assertIsNone(metrics_data)
178+
179+
results = []
180+
181+
for value in self.values:
182+
histogram.record(value)
183+
results.append(reader.get_metrics_data())
184+
185+
start_time_unix_nano = (
186+
results[0]
187+
.resource_metrics[0]
188+
.scope_metrics[0]
189+
.metrics[0]
190+
.data.data_points[0]
191+
.start_time_unix_nano
192+
)
193+
194+
for index, metrics_data in enumerate(results):
195+
196+
metric_data = (
197+
metrics_data.resource_metrics[0]
198+
.scope_metrics[0]
199+
.metrics[0]
200+
.data.data_points[0]
201+
)
202+
203+
self.assertEqual(
204+
start_time_unix_nano, metric_data.start_time_unix_nano
205+
)
206+
self.assertEqual(
207+
metric_data.bucket_counts,
208+
tuple(
209+
[
210+
1
211+
if inner_index <= index + 1 and inner_index > 0
212+
else 0
213+
for inner_index, value in enumerate(range(16))
214+
]
215+
),
216+
)
217+
218+
results = []
219+
220+
for _ in range(10):
221+
222+
results.append(reader.get_metrics_data())
223+
224+
provider.shutdown()
225+
226+
start_time_unix_nano = (
227+
results[0]
228+
.resource_metrics[0]
229+
.scope_metrics[0]
230+
.metrics[0]
231+
.data.data_points[0]
232+
.start_time_unix_nano
233+
)
234+
235+
for metrics_data in results:
236+
237+
metric_data = (
238+
metrics_data.resource_metrics[0]
239+
.scope_metrics[0]
240+
.metrics[0]
241+
.data.data_points[0]
242+
)
243+
244+
self.assertEqual(
245+
start_time_unix_nano, metric_data.start_time_unix_nano
246+
)
247+
self.assertEqual(
248+
metric_data.bucket_counts,
249+
(0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0),
250+
)

opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,26 @@ def test_histogram_counter_collection(self):
6565

6666
metric_data = in_memory_metric_reader.get_metrics_data()
6767

68-
# FIXME ExplicitBucketHistogramAggregation is resetting counts to zero
69-
# even if aggregation temporality is cumulative.
7068
self.assertEqual(
71-
len(metric_data.resource_metrics[0].scope_metrics[0].metrics), 1
69+
len(metric_data.resource_metrics[0].scope_metrics[0].metrics), 2
7270
)
71+
7372
self.assertEqual(
7473
(
7574
metric_data.resource_metrics[0]
7675
.scope_metrics[0]
7776
.metrics[0]
7877
.data.data_points[0]
78+
.bucket_counts
79+
),
80+
(0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
81+
)
82+
self.assertEqual(
83+
(
84+
metric_data.resource_metrics[0]
85+
.scope_metrics[0]
86+
.metrics[1]
87+
.data.data_points[0]
7988
.value
8089
),
8190
1,

0 commit comments

Comments
 (0)