Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions ddtrace/bootstrap/preload.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,6 @@ def register_post_preload(func: t.Callable) -> None:
except Exception:
log.error("failed to enable crashtracking", exc_info=True)


if profiling_config.enabled:
log.debug("profiler enabled via environment variable")
try:
import ddtrace.profiling.auto # noqa: F401
except Exception:
log.error("failed to enable profiling", exc_info=True)

if config._runtime_metrics_enabled:
RuntimeWorker.enable()

Expand Down
7 changes: 6 additions & 1 deletion ddtrace/profiling/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
from .profiler import Profiler # noqa:F401
from ddtrace.internal.module import lazy


@lazy
def _():
from ddtrace.profiling.profiler import Profiler # noqa:F401
3 changes: 3 additions & 0 deletions ddtrace/profiling/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class Profiler:
def start(self) -> None: ...
def stop(self, flush: bool = True) -> None: ...
16 changes: 9 additions & 7 deletions ddtrace/profiling/collector/_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import wrapt

from ddtrace.internal.datadog.profiling import ddup
from ddtrace.profiling import _threading
from ddtrace.profiling import collector
from ddtrace.profiling._threading import get_thread_name
from ddtrace.profiling._threading import get_thread_native_id
from ddtrace.profiling.collector import CaptureSampler
from ddtrace.profiling.collector import CaptureSamplerCollector
from ddtrace.profiling.collector import _task
from ddtrace.profiling.collector import _traceback
from ddtrace.settings.profiling import config
Expand All @@ -22,7 +24,7 @@
def _current_thread():
# type: (...) -> typing.Tuple[int, str]
thread_id = _thread.get_ident()
return thread_id, _threading.get_thread_name(thread_id)
return thread_id, get_thread_name(thread_id)


# We need to know if wrapt is compiled in C or not. If it's not using the C module, then the wrappers function will
Expand All @@ -45,7 +47,7 @@ def __init__(
wrapped: typing.Any,
tracer: typing.Optional[Tracer],
max_nframes: int,
capture_sampler: collector.CaptureSampler,
capture_sampler: CaptureSampler,
endpoint_collection_enabled: bool,
) -> None:
wrapt.ObjectProxy.__init__(self, wrapped)
Expand Down Expand Up @@ -88,7 +90,7 @@ def _acquire(self, inner_func, *args, **kwargs):

frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)

thread_native_id = _threading.get_thread_native_id(thread_id)
thread_native_id = get_thread_native_id(thread_id)

handle = ddup.SampleHandle()
handle.push_monotonic_ns(end)
Expand Down Expand Up @@ -146,7 +148,7 @@ def _release(self, inner_func, *args, **kwargs):

frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)

thread_native_id = _threading.get_thread_native_id(thread_id)
thread_native_id = get_thread_native_id(thread_id)

handle = ddup.SampleHandle()
handle.push_monotonic_ns(end)
Expand Down Expand Up @@ -227,7 +229,7 @@ def __get__(self, instance, owner=None):
return self


class LockCollector(collector.CaptureSamplerCollector):
class LockCollector(CaptureSamplerCollector):
"""Record lock usage."""

def __init__(
Expand Down
22 changes: 11 additions & 11 deletions ddtrace/profiling/collector/_task.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import weakref

from wrapt.importer import when_imported

from .. import _asyncio
from .. import _threading
from ddtrace.profiling._asyncio import get_event_loop_for_thread, current_task, _task_get_name, all_tasks
from ddtrace.profiling._threading import get_thread_name, get_thread_by_id
from ddtrace.settings.profiling import config


Expand Down Expand Up @@ -88,12 +88,12 @@ cpdef get_task(thread_id):
task_name = None
frame = None

loop = _asyncio.get_event_loop_for_thread(thread_id)
loop = get_event_loop_for_thread(thread_id)
if loop is not None:
task = _asyncio.current_task(loop)
task = current_task(loop)
if task is not None:
task_id = id(task)
task_name = _asyncio._task_get_name(task)
task_name = _task_get_name(task)
frame = _asyncio_task_get_frame(task)

if not is_stack_v2:
Expand All @@ -104,7 +104,7 @@ cpdef get_task(thread_id):
gevent_thread = _gevent_tracer.gevent.thread
task_id = gevent_thread.get_ident(_gevent_tracer.active_greenlet)
# Greenlets might be started as Thread in gevent
task_name = _threading.get_thread_name(task_id)
task_name = get_thread_name(task_id)
frame = _gevent_tracer.active_greenlet.gr_frame

return task_id, task_name, frame
Expand All @@ -122,29 +122,29 @@ cpdef list_tasks(thread_id):
tasks = []

if not is_stack_v2 and _gevent_tracer is not None:
if type(_threading.get_thread_by_id(thread_id)).__name__.endswith("_MainThread"):
if type(get_thread_by_id(thread_id)).__name__.endswith("_MainThread"):
# Under normal circumstances, the Hub is running in the main thread.
# Python will only ever have a single instance of a _MainThread
# class, so if we find it we attribute all the greenlets to it.
tasks.extend(
[
(
greenlet_id,
_threading.get_thread_name(greenlet_id),
get_thread_name(greenlet_id),
greenlet.gr_frame
)
for greenlet_id, greenlet in dict(_gevent_tracer.greenlets).items()
if not greenlet.dead
]
)

loop = _asyncio.get_event_loop_for_thread(thread_id)
loop = get_event_loop_for_thread(thread_id)
if loop is not None:
tasks.extend([
(id(task),
_asyncio._task_get_name(task),
_task_get_name(task),
_asyncio_task_get_frame(task))
for task in _asyncio.all_tasks(loop)
for task in all_tasks(loop)
])

return tasks
11 changes: 6 additions & 5 deletions ddtrace/profiling/collector/memalloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
_memalloc = None # type: ignore[assignment]

from ddtrace.internal.datadog.profiling import ddup
from ddtrace.profiling import _threading
from ddtrace.profiling import collector
from ddtrace.profiling._threading import get_thread_name
from ddtrace.profiling._threading import get_thread_native_id
from ddtrace.profiling.collector import CollectorUnavailable
from ddtrace.settings.profiling import config


Expand Down Expand Up @@ -44,7 +45,7 @@ def start(self):
# type: (...) -> None
"""Start collecting memory profiles."""
if _memalloc is None:
raise collector.CollectorUnavailable
raise CollectorUnavailable

try:
_memalloc.start(self.max_nframe, self.heap_sample_size)
Expand Down Expand Up @@ -105,8 +106,8 @@ def snapshot(self):

handle.push_threadinfo(
thread_id,
_threading.get_thread_native_id(thread_id),
_threading.get_thread_name(thread_id),
get_thread_native_id(thread_id),
get_thread_name(thread_id),
)
try:
for frame in frames:
Expand Down
18 changes: 9 additions & 9 deletions ddtrace/profiling/collector/pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import wrapt

from ddtrace.internal.datadog.profiling import ddup
from ddtrace.profiling import _threading
from ddtrace.profiling import collector
from ddtrace.profiling._threading import get_thread_name
from ddtrace.profiling._threading import get_thread_native_id
from ddtrace.profiling.collector import CaptureSamplerCollector
from ddtrace.profiling.collector import CollectorUnavailable
from ddtrace.settings.profiling import config
from ddtrace.trace import Tracer

Expand All @@ -28,7 +30,7 @@ def __init__(
self._self_tracer = tracer


class MLProfilerCollector(collector.CaptureSamplerCollector):
class MLProfilerCollector(CaptureSamplerCollector):
"""Record ML framework (i.e. pytorch) profiler usage."""

def __init__(self):
Expand Down Expand Up @@ -56,7 +58,7 @@ def _start_service(self):
try:
import torch
except ImportError as e:
raise collector.CollectorUnavailable(e)
raise CollectorUnavailable(e)
self._torch_module = torch
self.patch()
super()._start_service() # type: ignore[safe-super]
Expand Down Expand Up @@ -190,13 +192,11 @@ def handle_torch_trace(prof):
# If we can't get one, just use a default name.
handle.push_threadinfo(
e.thread,
_threading.get_thread_native_id(e.thread),
_threading.get_thread_name(e.thread) or "PYTORCH-CPU-THREAD-" + str(e.thread),
get_thread_native_id(e.thread),
get_thread_name(e.thread) or "PYTORCH-CPU-THREAD-" + str(e.thread),
)
elif str(e.device_type).startswith("DeviceType.CUDA"):
handle.push_threadinfo(
e.thread, _threading.get_thread_native_id(e.thread), "PYTORCH-CUDA-" + str(e.device_index)
)
handle.push_threadinfo(e.thread, get_thread_native_id(e.thread), "PYTORCH-CUDA-" + str(e.device_index))
else:
raise AttributeError(f"Unexpected device_type {e.device_type}")

Expand Down
16 changes: 8 additions & 8 deletions ddtrace/profiling/collector/stack.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ from ddtrace.internal import core
from ddtrace.internal._threads import periodic_threads
from ddtrace.internal.datadog.profiling import ddup
from ddtrace.internal.datadog.profiling import stack_v2
from ddtrace.profiling import _threading
from ddtrace.profiling import collector
from ddtrace.profiling._threading import get_thread_name, get_thread_native_id, _ThreadLink
from ddtrace.profiling.collector import PeriodicCollector
from ddtrace.profiling.collector import _task
from ddtrace.profiling.collector import _traceback
from ddtrace.profiling.collector import threading
Expand Down Expand Up @@ -98,7 +98,7 @@ IF UNAME_SYSNAME == "Linux":

# We should now be safe doing more Pythonic stuff and maybe releasing the GIL
for pthread_id, cpu_time in zip(pthread_ids, cpu_times):
thread_native_id = _threading.get_thread_native_id(pthread_id)
thread_native_id = get_thread_native_id(pthread_id)
key = pthread_id, thread_native_id
# Do a max(0, …) here just in case the result is < 0:
# This should never happen, but it can happen if the one chance in a billion happens:
Expand Down Expand Up @@ -136,7 +136,7 @@ ELSE:
else:
cpu_time //= nb_threads
return {
(pthread_id, _threading.get_thread_native_id(pthread_id)): cpu_time
(pthread_id, get_thread_native_id(pthread_id)): cpu_time
for pthread_id in pthread_ids
}

Expand Down Expand Up @@ -261,7 +261,7 @@ cdef collect_threads(thread_id_ignore_list, thread_time, thread_span_links) with
(
pthread_id,
native_thread_id,
_threading.get_thread_name(pthread_id),
get_thread_name(pthread_id),
running_threads[pthread_id],
current_exceptions.get(pthread_id),
thread_span_links.get_active_span_from_thread_id(pthread_id) if thread_span_links else None,
Expand Down Expand Up @@ -356,9 +356,9 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim


if typing.TYPE_CHECKING:
_thread_span_links_base = _threading._ThreadLink[ddspan.Span]
_thread_span_links_base = _ThreadLink[ddspan.Span]
else:
_thread_span_links_base = _threading._ThreadLink
_thread_span_links_base = _ThreadLink


class _ThreadSpanLinks(_thread_span_links_base):
Expand Down Expand Up @@ -398,7 +398,7 @@ def _default_min_interval_time():
return sys.getswitchinterval() * 2


class StackCollector(collector.PeriodicCollector):
class StackCollector(PeriodicCollector):
"""Execution stacks collector."""

__slots__ = (
Expand Down
31 changes: 15 additions & 16 deletions ddtrace/profiling/collector/threading.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import absolute_import

import threading
import typing # noqa:F401
import typing

from ddtrace.internal._unpatched import _threading as ddtrace_threading
from ddtrace.internal.datadog.profiling import stack_v2
Expand All @@ -19,23 +18,23 @@ class ThreadingLockCollector(_lock.LockCollector):

PROFILED_LOCK_CLASS = _ProfiledThreadingLock

def _get_patch_target(self):
# type: (...) -> typing.Any
def _get_patch_target(self) -> typing.Any:
# Use the copy of threading module that the target application is using
import threading

return threading.Lock

def _set_patch_target(
self,
value, # type: typing.Any
):
# type: (...) -> None
def _set_patch_target(self, value: typing.Any) -> None:
import threading

threading.Lock = value


# Also patch threading.Thread so echion can track thread lifetimes
def init_stack_v2():
def init_stack_v2() -> None:
if config.stack.v2_enabled and stack_v2.is_available:
_thread_set_native_id = ddtrace_threading.Thread._set_native_id
_thread_bootstrap_inner = ddtrace_threading.Thread._bootstrap_inner
_thread_set_native_id = ddtrace_threading.Thread._set_native_id # type: ignore[attr-defined]
_thread_bootstrap_inner = ddtrace_threading.Thread._bootstrap_inner # type: ignore[attr-defined]

def thread_set_native_id(self, *args, **kswargs):
_thread_set_native_id(self, *args, **kswargs)
Expand All @@ -45,9 +44,9 @@ def thread_bootstrap_inner(self, *args, **kwargs):
_thread_bootstrap_inner(self, *args, **kwargs)
stack_v2.unregister_thread(self.ident)

ddtrace_threading.Thread._set_native_id = thread_set_native_id
ddtrace_threading.Thread._bootstrap_inner = thread_bootstrap_inner
ddtrace_threading.Thread._set_native_id = thread_set_native_id # type: ignore[attr-defined]
ddtrace_threading.Thread._bootstrap_inner = thread_bootstrap_inner # type: ignore[attr-defined]

# Instrument any living threads
for thread_id, thread in ddtrace_threading._active.items():
stack_v2.register_thread(thread_id, thread.native_id, thread.name)
for thread_id, thread in ddtrace_threading._active.items(): # type: ignore[attr-defined]
stack_v2.register_thread(thread_id, thread.native_id, thread.name) # type: ignore[attr-defined]
34 changes: 34 additions & 0 deletions ddtrace/profiling/product.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from ddtrace.settings.profiling import config


def post_preload():
pass


def start():
if config.enabled:
import ddtrace.profiling.auto # noqa: F401


def restart(join=False):
if config.enabled:
from ddtrace.profiling import bootstrap

try:
bootstrap.profiler._restart_on_fork()
except AttributeError:
pass


def stop(join=False):
if config.enabled:
from ddtrace.profiling import bootstrap

try:
bootstrap.profiler.stop()
except AttributeError:
pass


def at_exit(join=False):
stop(join=join)
Loading
Loading