Skip to content

Commit c51244f

Browse files
committed
Fix memory leak
1 parent 2bcbbcc commit c51244f

File tree

3 files changed

+82
-3
lines changed

3 files changed

+82
-3
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
14+
import weakref
1515
from atexit import register, unregister
1616
from logging import getLogger
1717
from os import environ
@@ -393,7 +393,7 @@ class MeterProvider(APIMeterProvider):
393393
"""
394394

395395
_all_metric_readers_lock = Lock()
396-
_all_metric_readers = set()
396+
_all_metric_readers = weakref.WeakSet()
397397

398398
def __init__(
399399
self,

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import math
1616
import os
17+
import weakref
1718
from abc import ABC, abstractmethod
1819
from enum import Enum
1920
from logging import getLogger
@@ -490,8 +491,10 @@ def __init__(
490491
)
491492
self._daemon_thread.start()
492493
if hasattr(os, "register_at_fork"):
494+
weak_at_fork = weakref.WeakMethod(self._at_fork_reinit)
495+
493496
os.register_at_fork(
494-
after_in_child=self._at_fork_reinit
497+
after_in_child=lambda: weak_at_fork()()
495498
) # pylint: disable=protected-access
496499
elif self._export_interval_millis <= 0:
497500
raise ValueError(
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import gc
16+
import time
17+
import weakref
18+
from typing import Sequence
19+
from unittest import TestCase
20+
21+
from opentelemetry.proto.metrics.v1.metrics_pb2 import Metric
22+
from opentelemetry.sdk.metrics import MeterProvider
23+
from opentelemetry.sdk.metrics._internal.export import MetricExporter, MetricExportResult, PeriodicExportingMetricReader
24+
25+
26+
class FakeMetricsExporter(MetricExporter):
27+
def __init__(self, wait=0, preferred_temporality=None, preferred_aggregation=None):
28+
self.wait = wait
29+
self.metrics = []
30+
self._shutdown = False
31+
super().__init__(
32+
preferred_temporality=preferred_temporality,
33+
preferred_aggregation=preferred_aggregation,
34+
)
35+
36+
def export(
37+
self,
38+
metrics_data: Sequence[Metric],
39+
timeout_millis: float = 10_000,
40+
**kwargs,
41+
) -> MetricExportResult:
42+
time.sleep(self.wait)
43+
self.metrics.extend(metrics_data)
44+
return True
45+
46+
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
47+
self._shutdown = True
48+
49+
def force_flush(self, timeout_millis: float = 10_000) -> bool:
50+
return True
51+
52+
53+
class TestMeterProviderShutdown(TestCase):
54+
def test_meter_provider_shutdown_triggers_garbage_collection(self):
55+
def create_and_shutdown():
56+
exporter = FakeMetricsExporter()
57+
exporter_wr = weakref.ref(exporter)
58+
59+
reader = PeriodicExportingMetricReader(exporter)
60+
reader_wr = weakref.ref(reader)
61+
62+
provider = MeterProvider(metric_readers=[reader])
63+
provider_wr = weakref.ref(provider)
64+
65+
provider.shutdown()
66+
67+
return exporter_wr, reader_wr, provider_wr
68+
69+
# When: the provider is shutdown
70+
exporter_weakref, reader_weakref, provider_weakref = create_and_shutdown()
71+
gc.collect()
72+
73+
# Then: the provider, exporter and reader should be garbage collected
74+
self.assertIsNone(exporter_weakref())
75+
self.assertIsNone(reader_weakref())
76+
self.assertIsNone(provider_weakref())

0 commit comments

Comments
 (0)