From b288635cb37441ebbf97334ce5aed53980f1007b Mon Sep 17 00:00:00 2001 From: Taegyun Kim Date: Fri, 12 Sep 2025 17:44:56 -0400 Subject: [PATCH 1/5] refactor(profiling): migrate to product interface --- ddtrace/bootstrap/preload.py | 8 ----- ddtrace/profiling/__init__.py | 7 +++- ddtrace/profiling/__init__.pyi | 3 ++ ddtrace/profiling/collector/threading.py | 10 ++++-- ddtrace/profiling/product.py | 34 +++++++++++++++++++ ddtrace/profiling/profiler.py | 30 ++-------------- pyproject.toml | 1 + tests/profiling/gevent_fork.py | 2 +- tests/profiling/simple_program_fork.py | 2 +- tests/profiling/test_main.py | 8 ++--- tests/profiling/test_uwsgi.py | 19 +++++++++-- tests/profiling/uwsgi-app.py | 10 +++++- .../profiling_v2/collector/test_threading.py | 1 + tests/profiling_v2/simple_program_fork.py | 2 +- tests/profiling_v2/test_main.py | 1 + 15 files changed, 86 insertions(+), 52 deletions(-) create mode 100644 ddtrace/profiling/__init__.pyi create mode 100644 ddtrace/profiling/product.py diff --git a/ddtrace/bootstrap/preload.py b/ddtrace/bootstrap/preload.py index 50239be9cd8..2593f8e2e8b 100644 --- a/ddtrace/bootstrap/preload.py +++ b/ddtrace/bootstrap/preload.py @@ -53,14 +53,6 @@ def register_post_preload(func: t.Callable) -> None: except Exception: log.error("failed to enable crashtracking", exc_info=True) - -if profiling_config.enabled: - log.debug("profiler enabled via environment variable") - try: - import ddtrace.profiling.auto # noqa: F401 - except Exception: - log.error("failed to enable profiling", exc_info=True) - if config._runtime_metrics_enabled: RuntimeWorker.enable() diff --git a/ddtrace/profiling/__init__.py b/ddtrace/profiling/__init__.py index 3e96d7a6685..5a69eadfb90 100644 --- a/ddtrace/profiling/__init__.py +++ b/ddtrace/profiling/__init__.py @@ -1 +1,6 @@ -from .profiler import Profiler # noqa:F401 +from ddtrace.internal.module import lazy + + +@lazy +def _(): + from ddtrace.profiling.profiler import Profiler # noqa:F401 diff --git a/ddtrace/profiling/__init__.pyi b/ddtrace/profiling/__init__.pyi new file mode 100644 index 00000000000..72fb6fc4e0c --- /dev/null +++ b/ddtrace/profiling/__init__.pyi @@ -0,0 +1,3 @@ +class Profiler: + def start(self) -> None: ... + def stop(self, flush: bool = True) -> None: ... diff --git a/ddtrace/profiling/collector/threading.py b/ddtrace/profiling/collector/threading.py index ab6bf66e6f3..c40bafd256d 100644 --- a/ddtrace/profiling/collector/threading.py +++ b/ddtrace/profiling/collector/threading.py @@ -19,8 +19,9 @@ class ThreadingLockCollector(_lock.LockCollector): PROFILED_LOCK_CLASS = _ProfiledThreadingLock - def _get_patch_target(self): - # type: (...) -> typing.Any + def _get_patch_target(self) -> typing.Any: + import threading + return threading.Lock def _set_patch_target( @@ -32,7 +33,10 @@ def _set_patch_target( # Also patch threading.Thread so echion can track thread lifetimes -def init_stack_v2(): +def init_stack_v2() -> None: + import threading + from threading import Thread + if config.stack.v2_enabled and stack_v2.is_available: _thread_set_native_id = ddtrace_threading.Thread._set_native_id _thread_bootstrap_inner = ddtrace_threading.Thread._bootstrap_inner diff --git a/ddtrace/profiling/product.py b/ddtrace/profiling/product.py new file mode 100644 index 00000000000..f09a290eded --- /dev/null +++ b/ddtrace/profiling/product.py @@ -0,0 +1,34 @@ +from ddtrace.settings.profiling import config + + +def post_preload(): + pass + + +def start(): + if config.enabled: + import ddtrace.profiling.auto # noqa: F401 + + +def restart(join=False): + if config.enabled: + from ddtrace.profiling import bootstrap + + try: + bootstrap.profiler._restart_on_fork() + except AttributeError: + pass + + +def stop(join=False): + if config.enabled: + from ddtrace.profiling import bootstrap + + try: + bootstrap.profiler.stop() + except AttributeError: + pass + + +def at_exit(join=False): + stop(join=join) diff --git a/ddtrace/profiling/profiler.py b/ddtrace/profiling/profiler.py index 48f953e7856..187601deea3 100644 --- a/ddtrace/profiling/profiler.py +++ b/ddtrace/profiling/profiler.py @@ -10,14 +10,9 @@ import ddtrace from ddtrace import config -from ddtrace.internal import atexit -from ddtrace.internal import forksafe from ddtrace.internal import service -from ddtrace.internal import uwsgi from ddtrace.internal.datadog.profiling import ddup from ddtrace.internal.module import ModuleWatchdog -from ddtrace.internal.telemetry import telemetry_writer -from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT from ddtrace.profiling import collector from ddtrace.profiling import scheduler from ddtrace.profiling.collector import asyncio @@ -43,39 +38,18 @@ class Profiler(object): def __init__(self, *args, **kwargs): self._profiler = _ProfilerInstance(*args, **kwargs) - def start(self, stop_on_exit=True, profile_children=True): - """Start the profiler. - - :param stop_on_exit: Whether to stop the profiler and flush the profile on exit. - :param profile_children: Whether to start a profiler in child processes. - """ - - if profile_children: - try: - uwsgi.check_uwsgi(self._restart_on_fork, atexit=self.stop if stop_on_exit else None) - except uwsgi.uWSGIMasterProcess: - # Do nothing, the start() method will be called in each worker subprocess - return + def start(self): + """Start the profiler.""" self._profiler.start() - if stop_on_exit: - atexit.register(self.stop) - - if profile_children: - forksafe.register(self._restart_on_fork) - - telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.PROFILER, True) - def stop(self, flush=True): """Stop the profiler. :param flush: Flush last profile. """ - atexit.unregister(self.stop) try: self._profiler.stop(flush) - telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.PROFILER, False) except service.ServiceStatusError: # Not a best practice, but for backward API compatibility that allowed to call `stop` multiple times. pass diff --git a/pyproject.toml b/pyproject.toml index 80b0eddd929..fc913c18b8f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,6 +72,7 @@ ddtrace = "ddtrace.contrib.internal.pytest.plugin" "dynamic-instrumentation" = "ddtrace.debugging._products.dynamic_instrumentation" "exception-replay" = "ddtrace.debugging._products.exception_replay" "live-debugger" = "ddtrace.debugging._products.live_debugger" +"profiling" = "ddtrace.profiling.product" "error-tracking" = "ddtrace.errortracking.product" "remote-configuration" = "ddtrace.internal.remoteconfig.products.client" "symbol-database" = "ddtrace.internal.symbol_db.product" diff --git a/tests/profiling/gevent_fork.py b/tests/profiling/gevent_fork.py index a648ca7f80c..86d681019dd 100644 --- a/tests/profiling/gevent_fork.py +++ b/tests/profiling/gevent_fork.py @@ -9,7 +9,7 @@ from ddtrace.profiling import profiler # noqa:E402,F401 -p = profiler.Profiler().start(profile_children=True) +p = profiler.Profiler().start() pid = os.fork() if pid == 0: diff --git a/tests/profiling/simple_program_fork.py b/tests/profiling/simple_program_fork.py index ad8c0541ccd..947f8cb2fbe 100644 --- a/tests/profiling/simple_program_fork.py +++ b/tests/profiling/simple_program_fork.py @@ -2,8 +2,8 @@ import sys import threading +import ddtrace.auto from ddtrace.internal import service -import ddtrace.profiling.auto import ddtrace.profiling.bootstrap import ddtrace.profiling.profiler diff --git a/tests/profiling/test_main.py b/tests/profiling/test_main.py index 23add5ef75b..23e5da72b26 100644 --- a/tests/profiling/test_main.py +++ b/tests/profiling/test_main.py @@ -36,10 +36,7 @@ def test_call_script_gevent(monkeypatch): def test_call_script_pprof_output(tmp_path, monkeypatch): - """This checks if the pprof output and atexit register work correctly. - - The script does not run for one minute, so if the `stop_on_exit` flag is broken, this test will fail. - """ + """This checks if the pprof output and atexit register work correctly.""" filename = str(tmp_path / "pprof") monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename) monkeypatch.setenv("DD_PROFILING_CAPTURE_PCT", "1") @@ -58,13 +55,14 @@ def test_call_script_pprof_output(tmp_path, monkeypatch): @pytest.mark.skipif(sys.platform == "win32", reason="fork only available on Unix") def test_fork(tmp_path, monkeypatch): filename = str(tmp_path / "pprof") + monkeypatch.setenv("DD_PROFILING_ENABLED", "1") monkeypatch.setenv("DD_PROFILING_API_TIMEOUT", "0.1") monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename) monkeypatch.setenv("DD_PROFILING_CAPTURE_PCT", "100") stdout, stderr, exitcode, pid = call_program( "python", os.path.join(os.path.dirname(__file__), "simple_program_fork.py") ) - assert exitcode == 0 + assert exitcode == 0, stderr child_pid = stdout.decode().strip() utils.check_pprof_file(filename + "." + str(pid)) utils.check_pprof_file(filename + "." + str(child_pid), sample_type="lock-release") diff --git a/tests/profiling/test_uwsgi.py b/tests/profiling/test_uwsgi.py index 79b20e917a8..689a7e312bd 100644 --- a/tests/profiling/test_uwsgi.py +++ b/tests/profiling/test_uwsgi.py @@ -42,8 +42,11 @@ def uwsgi(monkeypatch): def test_uwsgi_threads_disabled(uwsgi): proc = uwsgi() - stdout, _ = proc.communicate() - assert proc.wait() != 0 + try: + stdout, _ = proc.communicate(timeout=1) + except TimeoutExpired: + proc.terminate() + stdout, _ = proc.communicate() assert THREADS_MSG in stdout @@ -59,6 +62,7 @@ def test_uwsgi_threads_number_set(uwsgi): def test_uwsgi_threads_enabled(uwsgi, tmp_path, monkeypatch): filename = str(tmp_path / "uwsgi.pprof") + monkeypatch.setenv("DD_PROFILING_ENABLED", "1") monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename) proc = uwsgi("--enable-threads") worker_pids = _get_worker_pids(proc.stdout, 1) @@ -71,8 +75,14 @@ def test_uwsgi_threads_enabled(uwsgi, tmp_path, monkeypatch): def test_uwsgi_threads_processes_no_master(uwsgi, monkeypatch): + monkeypatch.setenv("DD_PROFILING_ENABLED", "1") + monkeypatch.setenv("STOP_AFTER_LOAD", "1") proc = uwsgi("--enable-threads", "--processes", "2") - stdout, _ = proc.communicate() + try: + stdout, _ = proc.communicate(timeout=1) + except TimeoutExpired: + proc.terminate() + stdout, _ = proc.communicate() assert ( b"ddtrace.internal.uwsgi.uWSGIConfigError: master option must be enabled when multiple processes are used" in stdout @@ -101,6 +111,7 @@ def _get_worker_pids(stdout, num_worker, num_app_started=1): def test_uwsgi_threads_processes_master(uwsgi, tmp_path, monkeypatch): filename = str(tmp_path / "uwsgi.pprof") + monkeypatch.setenv("DD_PROFILING_ENABLED", "1") monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename) proc = uwsgi("--enable-threads", "--master", "--py-call-uwsgi-fork-hooks", "--processes", "2") worker_pids = _get_worker_pids(proc.stdout, 2) @@ -114,6 +125,7 @@ def test_uwsgi_threads_processes_master(uwsgi, tmp_path, monkeypatch): def test_uwsgi_threads_processes_master_lazy_apps(uwsgi, tmp_path, monkeypatch): filename = str(tmp_path / "uwsgi.pprof") + monkeypatch.setenv("DD_PROFILING_ENABLED", "1") monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename) proc = uwsgi("--enable-threads", "--master", "--processes", "2", "--lazy-apps") worker_pids = _get_worker_pids(proc.stdout, 2, 2) @@ -127,6 +139,7 @@ def test_uwsgi_threads_processes_master_lazy_apps(uwsgi, tmp_path, monkeypatch): def test_uwsgi_threads_processes_no_master_lazy_apps(uwsgi, tmp_path, monkeypatch): filename = str(tmp_path / "uwsgi.pprof") + monkeypatch.setenv("DD_PROFILING_ENABLED", "1") monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename) proc = uwsgi("--enable-threads", "--processes", "2", "--lazy-apps") worker_pids = _get_worker_pids(proc.stdout, 2, 2) diff --git a/tests/profiling/uwsgi-app.py b/tests/profiling/uwsgi-app.py index 6d875c0aca3..7519060c82d 100644 --- a/tests/profiling/uwsgi-app.py +++ b/tests/profiling/uwsgi-app.py @@ -1,5 +1,13 @@ -import ddtrace.profiling.auto # noqa:F401 +import os + +import ddtrace.auto # noqa:F401 def application(): pass + + +if os.getenv("STOP_AFTER_LOAD"): + import sys + + sys.exit(0) diff --git a/tests/profiling_v2/collector/test_threading.py b/tests/profiling_v2/collector/test_threading.py index 084f1c77356..71120dc6787 100644 --- a/tests/profiling_v2/collector/test_threading.py +++ b/tests/profiling_v2/collector/test_threading.py @@ -195,6 +195,7 @@ def test_wrapt_disable_extensions(): @pytest.mark.skipif(not TESTING_GEVENT, reason="gevent is not available") @pytest.mark.subprocess( env=dict(DD_PROFILING_FILE_PATH=__file__), + err=None, ) def test_lock_gevent_tasks(): from gevent import monkey diff --git a/tests/profiling_v2/simple_program_fork.py b/tests/profiling_v2/simple_program_fork.py index ad8c0541ccd..947f8cb2fbe 100644 --- a/tests/profiling_v2/simple_program_fork.py +++ b/tests/profiling_v2/simple_program_fork.py @@ -2,8 +2,8 @@ import sys import threading +import ddtrace.auto from ddtrace.internal import service -import ddtrace.profiling.auto import ddtrace.profiling.bootstrap import ddtrace.profiling.profiler diff --git a/tests/profiling_v2/test_main.py b/tests/profiling_v2/test_main.py index cbd10b294a6..214b2940477 100644 --- a/tests/profiling_v2/test_main.py +++ b/tests/profiling_v2/test_main.py @@ -68,6 +68,7 @@ def test_call_script_pprof_output(tmp_path): def test_fork(tmp_path): filename = str(tmp_path / "pprof") env = os.environ.copy() + env["DD_PROFILING_ENABLED"] = "1" env["DD_PROFILING_OUTPUT_PPROF"] = filename env["DD_PROFILING_CAPTURE_PCT"] = "100" stdout, stderr, exitcode, pid = call_program( From 02a1be03176f05027e5098c641203ca50b57ef60 Mon Sep 17 00:00:00 2001 From: Taegyun Kim Date: Wed, 17 Sep 2025 18:52:46 +0000 Subject: [PATCH 2/5] update comment --- ddtrace/profiling/collector/threading.py | 27 ++++++++++-------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/ddtrace/profiling/collector/threading.py b/ddtrace/profiling/collector/threading.py index c40bafd256d..89631a6ed1d 100644 --- a/ddtrace/profiling/collector/threading.py +++ b/ddtrace/profiling/collector/threading.py @@ -1,7 +1,6 @@ from __future__ import absolute_import -import threading -import typing # noqa:F401 +import typing from ddtrace.internal._unpatched import _threading as ddtrace_threading from ddtrace.internal.datadog.profiling import stack_v2 @@ -20,26 +19,22 @@ class ThreadingLockCollector(_lock.LockCollector): PROFILED_LOCK_CLASS = _ProfiledThreadingLock def _get_patch_target(self) -> typing.Any: + # Use the copy of threading module that the target application is using import threading return threading.Lock - def _set_patch_target( - self, - value, # type: typing.Any - ): - # type: (...) -> None + def _set_patch_target(self, value: typing.Any) -> None: + import threading + threading.Lock = value # Also patch threading.Thread so echion can track thread lifetimes def init_stack_v2() -> None: - import threading - from threading import Thread - if config.stack.v2_enabled and stack_v2.is_available: - _thread_set_native_id = ddtrace_threading.Thread._set_native_id - _thread_bootstrap_inner = ddtrace_threading.Thread._bootstrap_inner + _thread_set_native_id = ddtrace_threading.Thread._set_native_id # type: ignore[attr-defined] + _thread_bootstrap_inner = ddtrace_threading.Thread._bootstrap_inner # type: ignore[attr-defined] def thread_set_native_id(self, *args, **kswargs): _thread_set_native_id(self, *args, **kswargs) @@ -49,9 +44,9 @@ def thread_bootstrap_inner(self, *args, **kwargs): _thread_bootstrap_inner(self, *args, **kwargs) stack_v2.unregister_thread(self.ident) - ddtrace_threading.Thread._set_native_id = thread_set_native_id - ddtrace_threading.Thread._bootstrap_inner = thread_bootstrap_inner + ddtrace_threading.Thread._set_native_id = thread_set_native_id # type: ignore[attr-defined] + ddtrace_threading.Thread._bootstrap_inner = thread_bootstrap_inner # type: ignore[attr-defined] # Instrument any living threads - for thread_id, thread in ddtrace_threading._active.items(): - stack_v2.register_thread(thread_id, thread.native_id, thread.name) + for thread_id, thread in ddtrace_threading._active.items(): # type: ignore[attr-defined] + stack_v2.register_thread(thread_id, thread.native_id, thread.name) # type: ignore[attr-defined] From b837e7a6122858fe313bb896dc60c62fa69acf41 Mon Sep 17 00:00:00 2001 From: Taegyun Kim Date: Wed, 17 Sep 2025 19:13:47 +0000 Subject: [PATCH 3/5] avoid circular imports --- ddtrace/profiling/collector/_lock.py | 16 +++++++++------- ddtrace/profiling/collector/_task.pyx | 22 +++++++++++----------- ddtrace/profiling/collector/memalloc.py | 11 ++++++----- ddtrace/profiling/collector/pytorch.py | 18 +++++++++--------- ddtrace/profiling/collector/stack.pyx | 16 ++++++++-------- ddtrace/profiling/profiler.py | 17 +++++++++-------- 6 files changed, 52 insertions(+), 48 deletions(-) diff --git a/ddtrace/profiling/collector/_lock.py b/ddtrace/profiling/collector/_lock.py index e261664ad78..c343da2002c 100644 --- a/ddtrace/profiling/collector/_lock.py +++ b/ddtrace/profiling/collector/_lock.py @@ -11,8 +11,10 @@ import wrapt from ddtrace.internal.datadog.profiling import ddup -from ddtrace.profiling import _threading -from ddtrace.profiling import collector +from ddtrace.profiling._threading import get_thread_name +from ddtrace.profiling._threading import get_thread_native_id +from ddtrace.profiling.collector import CaptureSampler +from ddtrace.profiling.collector import CaptureSamplerCollector from ddtrace.profiling.collector import _task from ddtrace.profiling.collector import _traceback from ddtrace.settings.profiling import config @@ -22,7 +24,7 @@ def _current_thread(): # type: (...) -> typing.Tuple[int, str] thread_id = _thread.get_ident() - return thread_id, _threading.get_thread_name(thread_id) + return thread_id, get_thread_name(thread_id) # We need to know if wrapt is compiled in C or not. If it's not using the C module, then the wrappers function will @@ -45,7 +47,7 @@ def __init__( wrapped: typing.Any, tracer: typing.Optional[Tracer], max_nframes: int, - capture_sampler: collector.CaptureSampler, + capture_sampler: CaptureSampler, endpoint_collection_enabled: bool, ) -> None: wrapt.ObjectProxy.__init__(self, wrapped) @@ -88,7 +90,7 @@ def _acquire(self, inner_func, *args, **kwargs): frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes) - thread_native_id = _threading.get_thread_native_id(thread_id) + thread_native_id = get_thread_native_id(thread_id) handle = ddup.SampleHandle() handle.push_monotonic_ns(end) @@ -146,7 +148,7 @@ def _release(self, inner_func, *args, **kwargs): frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes) - thread_native_id = _threading.get_thread_native_id(thread_id) + thread_native_id = get_thread_native_id(thread_id) handle = ddup.SampleHandle() handle.push_monotonic_ns(end) @@ -227,7 +229,7 @@ def __get__(self, instance, owner=None): return self -class LockCollector(collector.CaptureSamplerCollector): +class LockCollector(CaptureSamplerCollector): """Record lock usage.""" def __init__( diff --git a/ddtrace/profiling/collector/_task.pyx b/ddtrace/profiling/collector/_task.pyx index b7939d908d8..e00fd3f6cf1 100644 --- a/ddtrace/profiling/collector/_task.pyx +++ b/ddtrace/profiling/collector/_task.pyx @@ -4,8 +4,8 @@ import weakref from wrapt.importer import when_imported -from .. import _asyncio -from .. import _threading +from ddtrace.profiling._asyncio import get_event_loop_for_thread, current_task, _task_get_name, all_tasks +from ddtrace.profiling._threading import get_thread_name, get_thread_by_id from ddtrace.settings.profiling import config @@ -88,12 +88,12 @@ cpdef get_task(thread_id): task_name = None frame = None - loop = _asyncio.get_event_loop_for_thread(thread_id) + loop = get_event_loop_for_thread(thread_id) if loop is not None: - task = _asyncio.current_task(loop) + task = current_task(loop) if task is not None: task_id = id(task) - task_name = _asyncio._task_get_name(task) + task_name = _task_get_name(task) frame = _asyncio_task_get_frame(task) if not is_stack_v2: @@ -104,7 +104,7 @@ cpdef get_task(thread_id): gevent_thread = _gevent_tracer.gevent.thread task_id = gevent_thread.get_ident(_gevent_tracer.active_greenlet) # Greenlets might be started as Thread in gevent - task_name = _threading.get_thread_name(task_id) + task_name = get_thread_name(task_id) frame = _gevent_tracer.active_greenlet.gr_frame return task_id, task_name, frame @@ -122,7 +122,7 @@ cpdef list_tasks(thread_id): tasks = [] if not is_stack_v2 and _gevent_tracer is not None: - if type(_threading.get_thread_by_id(thread_id)).__name__.endswith("_MainThread"): + if type(get_thread_by_id(thread_id)).__name__.endswith("_MainThread"): # Under normal circumstances, the Hub is running in the main thread. # Python will only ever have a single instance of a _MainThread # class, so if we find it we attribute all the greenlets to it. @@ -130,7 +130,7 @@ cpdef list_tasks(thread_id): [ ( greenlet_id, - _threading.get_thread_name(greenlet_id), + get_thread_name(greenlet_id), greenlet.gr_frame ) for greenlet_id, greenlet in dict(_gevent_tracer.greenlets).items() @@ -138,13 +138,13 @@ cpdef list_tasks(thread_id): ] ) - loop = _asyncio.get_event_loop_for_thread(thread_id) + loop = get_event_loop_for_thread(thread_id) if loop is not None: tasks.extend([ (id(task), - _asyncio._task_get_name(task), + _task_get_name(task), _asyncio_task_get_frame(task)) - for task in _asyncio.all_tasks(loop) + for task in all_tasks(loop) ]) return tasks diff --git a/ddtrace/profiling/collector/memalloc.py b/ddtrace/profiling/collector/memalloc.py index 5317689cd65..7848631881d 100644 --- a/ddtrace/profiling/collector/memalloc.py +++ b/ddtrace/profiling/collector/memalloc.py @@ -14,8 +14,9 @@ _memalloc = None # type: ignore[assignment] from ddtrace.internal.datadog.profiling import ddup -from ddtrace.profiling import _threading -from ddtrace.profiling import collector +from ddtrace.profiling._threading import get_thread_name +from ddtrace.profiling._threading import get_thread_native_id +from ddtrace.profiling.collector import CollectorUnavailable from ddtrace.settings.profiling import config @@ -44,7 +45,7 @@ def start(self): # type: (...) -> None """Start collecting memory profiles.""" if _memalloc is None: - raise collector.CollectorUnavailable + raise CollectorUnavailable try: _memalloc.start(self.max_nframe, self.heap_sample_size) @@ -105,8 +106,8 @@ def snapshot(self): handle.push_threadinfo( thread_id, - _threading.get_thread_native_id(thread_id), - _threading.get_thread_name(thread_id), + get_thread_native_id(thread_id), + get_thread_name(thread_id), ) try: for frame in frames: diff --git a/ddtrace/profiling/collector/pytorch.py b/ddtrace/profiling/collector/pytorch.py index 187b252629a..ffb8c1ca3c9 100644 --- a/ddtrace/profiling/collector/pytorch.py +++ b/ddtrace/profiling/collector/pytorch.py @@ -8,8 +8,10 @@ import wrapt from ddtrace.internal.datadog.profiling import ddup -from ddtrace.profiling import _threading -from ddtrace.profiling import collector +from ddtrace.profiling._threading import get_thread_name +from ddtrace.profiling._threading import get_thread_native_id +from ddtrace.profiling.collector import CaptureSamplerCollector +from ddtrace.profiling.collector import CollectorUnavailable from ddtrace.settings.profiling import config from ddtrace.trace import Tracer @@ -28,7 +30,7 @@ def __init__( self._self_tracer = tracer -class MLProfilerCollector(collector.CaptureSamplerCollector): +class MLProfilerCollector(CaptureSamplerCollector): """Record ML framework (i.e. pytorch) profiler usage.""" def __init__(self): @@ -56,7 +58,7 @@ def _start_service(self): try: import torch except ImportError as e: - raise collector.CollectorUnavailable(e) + raise CollectorUnavailable(e) self._torch_module = torch self.patch() super()._start_service() # type: ignore[safe-super] @@ -190,13 +192,11 @@ def handle_torch_trace(prof): # If we can't get one, just use a default name. handle.push_threadinfo( e.thread, - _threading.get_thread_native_id(e.thread), - _threading.get_thread_name(e.thread) or "PYTORCH-CPU-THREAD-" + str(e.thread), + get_thread_native_id(e.thread), + get_thread_name(e.thread) or "PYTORCH-CPU-THREAD-" + str(e.thread), ) elif str(e.device_type).startswith("DeviceType.CUDA"): - handle.push_threadinfo( - e.thread, _threading.get_thread_native_id(e.thread), "PYTORCH-CUDA-" + str(e.device_index) - ) + handle.push_threadinfo(e.thread, get_thread_native_id(e.thread), "PYTORCH-CUDA-" + str(e.device_index)) else: raise AttributeError(f"Unexpected device_type {e.device_type}") diff --git a/ddtrace/profiling/collector/stack.pyx b/ddtrace/profiling/collector/stack.pyx index 78fb0efd26a..9ed7c225eb4 100644 --- a/ddtrace/profiling/collector/stack.pyx +++ b/ddtrace/profiling/collector/stack.pyx @@ -15,8 +15,8 @@ from ddtrace.internal import core from ddtrace.internal._threads import periodic_threads from ddtrace.internal.datadog.profiling import ddup from ddtrace.internal.datadog.profiling import stack_v2 -from ddtrace.profiling import _threading -from ddtrace.profiling import collector +from ddtrace.profiling._threading import get_thread_name, get_thread_native_id, _ThreadLink +from ddtrace.profiling.collector import PeriodicCollector from ddtrace.profiling.collector import _task from ddtrace.profiling.collector import _traceback from ddtrace.profiling.collector import threading @@ -98,7 +98,7 @@ IF UNAME_SYSNAME == "Linux": # We should now be safe doing more Pythonic stuff and maybe releasing the GIL for pthread_id, cpu_time in zip(pthread_ids, cpu_times): - thread_native_id = _threading.get_thread_native_id(pthread_id) + thread_native_id = get_thread_native_id(pthread_id) key = pthread_id, thread_native_id # Do a max(0, …) here just in case the result is < 0: # This should never happen, but it can happen if the one chance in a billion happens: @@ -136,7 +136,7 @@ ELSE: else: cpu_time //= nb_threads return { - (pthread_id, _threading.get_thread_native_id(pthread_id)): cpu_time + (pthread_id, get_thread_native_id(pthread_id)): cpu_time for pthread_id in pthread_ids } @@ -261,7 +261,7 @@ cdef collect_threads(thread_id_ignore_list, thread_time, thread_span_links) with ( pthread_id, native_thread_id, - _threading.get_thread_name(pthread_id), + get_thread_name(pthread_id), running_threads[pthread_id], current_exceptions.get(pthread_id), thread_span_links.get_active_span_from_thread_id(pthread_id) if thread_span_links else None, @@ -356,9 +356,9 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim if typing.TYPE_CHECKING: - _thread_span_links_base = _threading._ThreadLink[ddspan.Span] + _thread_span_links_base = _ThreadLink[ddspan.Span] else: - _thread_span_links_base = _threading._ThreadLink + _thread_span_links_base = _ThreadLink class _ThreadSpanLinks(_thread_span_links_base): @@ -398,7 +398,7 @@ def _default_min_interval_time(): return sys.getswitchinterval() * 2 -class StackCollector(collector.PeriodicCollector): +class StackCollector(PeriodicCollector): """Execution stacks collector.""" __slots__ = ( diff --git a/ddtrace/profiling/profiler.py b/ddtrace/profiling/profiler.py index 187601deea3..824db2ce19e 100644 --- a/ddtrace/profiling/profiler.py +++ b/ddtrace/profiling/profiler.py @@ -13,13 +13,14 @@ from ddtrace.internal import service from ddtrace.internal.datadog.profiling import ddup from ddtrace.internal.module import ModuleWatchdog -from ddtrace.profiling import collector -from ddtrace.profiling import scheduler +from ddtrace.profiling.collector import CollectorUnavailable from ddtrace.profiling.collector import asyncio from ddtrace.profiling.collector import memalloc from ddtrace.profiling.collector import pytorch from ddtrace.profiling.collector import stack from ddtrace.profiling.collector import threading +from ddtrace.profiling.scheduler import Scheduler +from ddtrace.profiling.scheduler import ServerlessScheduler from ddtrace.settings.profiling import config as profiling_config from ddtrace.settings.profiling import config_str @@ -115,7 +116,7 @@ def __init__( # Non-user-supplied values self._collectors: List[Union[stack.StackCollector, memalloc.MemoryCollector]] = [] self._collectors_on_import: Any = None - self._scheduler: Optional[Union[scheduler.Scheduler, scheduler.ServerlessScheduler]] = None + self._scheduler: Optional[Union[Scheduler, ServerlessScheduler]] = None self._lambda_function_name: Optional[str] = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") self.__post_init__() @@ -180,7 +181,7 @@ def start_collector(collector_class: Type) -> None: try: col.start() LOG.debug("Started collector %r", col) - except collector.CollectorUnavailable: + except CollectorUnavailable: LOG.debug("Collector %r is unavailable, disabling", col) return except Exception: @@ -208,7 +209,7 @@ def start_collector(collector_class: Type) -> None: try: col.start() LOG.debug("Started pytorch collector %r", col) - except collector.CollectorUnavailable: + except CollectorUnavailable: LOG.debug("Collector %r pytorch is unavailable, disabling", col) return except Exception: @@ -230,8 +231,8 @@ def start_collector(collector_class: Type) -> None: self._build_default_exporters() scheduler_class = ( - scheduler.ServerlessScheduler if self._lambda_function_name else scheduler.Scheduler - ) # type: (Type[Union[scheduler.Scheduler, scheduler.ServerlessScheduler]]) + ServerlessScheduler if self._lambda_function_name else Scheduler + ) # type: (Type[Union[Scheduler, ServerlessScheduler]]) self._scheduler = scheduler_class( before_flush=self._collectors_snapshot, @@ -263,7 +264,7 @@ def _start_service(self): for col in self._collectors: try: col.start() - except collector.CollectorUnavailable: + except CollectorUnavailable: LOG.debug("Collector %r is unavailable, disabling", col) except Exception: LOG.error("Failed to start collector %r, disabling.", col, exc_info=True) From 13218cf6dbce3bb4f0ca2b962836ccc1d5322d88 Mon Sep 17 00:00:00 2001 From: Taegyun Kim Date: Wed, 17 Sep 2025 20:56:55 +0000 Subject: [PATCH 4/5] fix uwsgi tests --- tests/profiling_v2/test_uwsgi.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/profiling_v2/test_uwsgi.py b/tests/profiling_v2/test_uwsgi.py index 20a0a92180b..1dabda11b36 100644 --- a/tests/profiling_v2/test_uwsgi.py +++ b/tests/profiling_v2/test_uwsgi.py @@ -48,14 +48,16 @@ def uwsgi(monkeypatch, tmp_path): os.unlink(socket_name) -def test_uwsgi_threads_disabled(uwsgi): +def test_uwsgi_threads_disabled(uwsgi, monkeypatch): + monkeypatch.setenv("STOP_AFTER_LOAD", "1") proc = uwsgi() stdout, _ = proc.communicate() - assert proc.wait() != 0 + assert proc.wait() == 0 assert THREADS_MSG in stdout -def test_uwsgi_threads_number_set(uwsgi): +def test_uwsgi_threads_number_set(uwsgi, monkeypatch): + monkeypatch.setenv("STOP_AFTER_LOAD", "1") proc = uwsgi("--threads", "1") try: stdout, _ = proc.communicate(timeout=1) @@ -81,6 +83,7 @@ def test_uwsgi_threads_enabled(uwsgi, tmp_path, monkeypatch): def test_uwsgi_threads_processes_no_primary(uwsgi, monkeypatch): + monkeypatch.setenv("STOP_AFTER_LOAD", "1") proc = uwsgi("--enable-threads", "--processes", "2") stdout, _ = proc.communicate() assert ( From 4cf57347a56d499cef77cf190accc6f21a4eefe6 Mon Sep 17 00:00:00 2001 From: Taegyun Kim Date: Wed, 17 Sep 2025 21:09:10 +0000 Subject: [PATCH 5/5] fix telemetry test --- tests/telemetry/test_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index 74458d3edbe..e93152d55b5 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -452,7 +452,7 @@ def test_app_started_event_configuration_override(test_agent_session, run_python {"name": "DD_PROFILING_SAMPLE_POOL_CAPACITY", "origin": "default", "value": 4}, {"name": "DD_PROFILING_STACK_ENABLED", "origin": "env_var", "value": False}, {"name": "DD_PROFILING_STACK_V2_ENABLED", "origin": "default", "value": True}, - {"name": "DD_PROFILING_TAGS", "origin": "default", "value": ""}, + {"name": "DD_PROFILING_TAGS", "origin": "default", "value": "team:apm,component:web"}, {"name": "DD_PROFILING_TIMELINE_ENABLED", "origin": "default", "value": True}, {"name": "DD_PROFILING_UPLOAD_INTERVAL", "origin": "env_var", "value": 10.0}, {"name": "DD_REMOTE_CONFIGURATION_ENABLED", "origin": "env_var", "value": True},