Skip to content

Commit fa358f8

Browse files
feat(profiling): add serverless scheduler and conditionally use it (#3870) (#4058)
Based on Julien's feedback in our RFC, I've added a small scheduler which flushes after the periodic function is called 60 times. This ensures that profiles are sampled over the course of 60s of runtime instead of wallclock time. (cherry picked from commit 83ce6ce) Co-authored-by: AJ Stuyvenberg <[email protected]>
1 parent e797c0d commit fa358f8

File tree

6 files changed

+83
-4
lines changed

6 files changed

+83
-4
lines changed

ddtrace/profiling/profiler.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,14 @@ class _ProfilerInstance(service.Service):
131131

132132
_recorder = attr.ib(init=False, default=None)
133133
_collectors = attr.ib(init=False, default=None)
134-
_scheduler = attr.ib(init=False, default=None)
134+
_scheduler = attr.ib(
135+
init=False,
136+
default=None,
137+
type=scheduler.Scheduler,
138+
)
139+
_lambda_function_name = attr.ib(
140+
init=False, factory=lambda: os.environ.get("AWS_LAMBDA_FUNCTION_NAME"), type=Optional[str]
141+
)
135142

136143
ENDPOINT_TEMPLATE = "https://intake.profile.{}"
137144

@@ -165,6 +172,9 @@ def _build_default_exporters(self):
165172
# to the agent base path.
166173
endpoint_path = "profiling/v1/input"
167174

175+
if self._lambda_function_name is not None:
176+
self.tags.update({"functionname": self._lambda_function_name.encode("utf-8")})
177+
168178
return [
169179
http.PprofHTTPExporter(
170180
service=self.service,
@@ -209,9 +219,11 @@ def __attrs_post_init__(self):
209219
exporters = self._build_default_exporters()
210220

211221
if exporters:
212-
self._scheduler = scheduler.Scheduler(
213-
recorder=r, exporters=exporters, before_flush=self._collectors_snapshot
214-
)
222+
if self._lambda_function_name is None:
223+
scheduler_class = scheduler.Scheduler
224+
else:
225+
scheduler_class = scheduler.ServerlessScheduler
226+
self._scheduler = scheduler_class(recorder=r, exporters=exporters, before_flush=self._collectors_snapshot)
215227

216228
self.set_asyncio_event_loop_policy()
217229

ddtrace/profiling/scheduler.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,35 @@ def periodic(self):
6565
self.flush()
6666
finally:
6767
self.interval = max(0, self._configured_interval - (compat.monotonic() - start_time))
68+
69+
70+
@attr.s
71+
class ServerlessScheduler(Scheduler):
72+
"""Serverless scheduler that works on, e.g., AWS Lambda.
73+
74+
The idea with this scheduler is to not sleep 60s, but to sleep 1s and flush out profiles after 60 sleeping period.
75+
As the service can be frozen a few seconds after flushing out a profile, we want to make sure the next flush is not
76+
> 60s later, but after at least 60 periods of 1s.
77+
78+
"""
79+
80+
# We force this interval everywhere
81+
FORCED_INTERVAL = 1.0
82+
FLUSH_AFTER_INTERVALS = 60.0
83+
84+
_interval = attr.ib(default=FORCED_INTERVAL, type=float)
85+
_profiled_intervals = attr.ib(init=False, default=0)
86+
87+
def periodic(self):
88+
# Check both the number of intervals and time frame to be sure we don't flush, e.g., empty profiles
89+
if self._profiled_intervals >= self.FLUSH_AFTER_INTERVALS and (compat.time_ns() - self._last_export) >= (
90+
self.FORCED_INTERVAL * self.FLUSH_AFTER_INTERVALS
91+
):
92+
try:
93+
super(ServerlessScheduler, self).periodic()
94+
finally:
95+
# Override interval so it's always back to the value we n
96+
self.interval = self.FORCED_INTERVAL
97+
self._profiled_intervals = 0
98+
else:
99+
self._profiled_intervals += 1

docs/spelling_wordlist.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,5 @@ yaaredis
186186
Kinesis
187187
AppSec
188188
libddwaf
189+
Serverless
190+
serverless
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
features:
3+
- |
4+
Adds support for Lambda profiling, which can be enabled by starting the profiler outside of the handler (on cold start).

tests/profiling/test_profiler.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from ddtrace.profiling import event
1010
from ddtrace.profiling import exporter
1111
from ddtrace.profiling import profiler
12+
from ddtrace.profiling import scheduler
1213
from ddtrace.profiling.collector import asyncio
1314
from ddtrace.profiling.collector import memalloc
1415
from ddtrace.profiling.collector import stack
@@ -378,3 +379,11 @@ def test_default_collectors():
378379
else:
379380
assert any(isinstance(c, asyncio.AsyncioLockCollector) for c in p._profiler._collectors)
380381
p.stop(flush=False)
382+
383+
384+
def test_profiler_serverless(monkeypatch):
385+
# type: (...) -> None
386+
monkeypatch.setenv("AWS_LAMBDA_FUNCTION_NAME", "foobar")
387+
p = profiler.Profiler()
388+
assert isinstance(p._scheduler, scheduler.ServerlessScheduler)
389+
assert p.tags["functionname"] == b"foobar"

tests/profiling/test_scheduler.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
# -*- encoding: utf-8 -*-
22
import logging
33

4+
import mock
5+
6+
from ddtrace.internal import compat
47
from ddtrace.profiling import event
58
from ddtrace.profiling import exporter
69
from ddtrace.profiling import recorder
@@ -54,3 +57,20 @@ def call_me():
5457
assert caplog.record_tuples == [
5558
(("ddtrace.profiling.scheduler", logging.ERROR, "Scheduler before_flush hook failed"))
5659
]
60+
61+
62+
@mock.patch("ddtrace.profiling.scheduler.Scheduler.periodic")
63+
def test_serverless_periodic(mock_periodic):
64+
r = recorder.Recorder()
65+
s = scheduler.ServerlessScheduler(r, [exporter.NullExporter()])
66+
# Fake start()
67+
s._last_export = compat.time_ns()
68+
s.periodic()
69+
assert s._profiled_intervals == 1
70+
mock_periodic.assert_not_called()
71+
s._last_export = compat.time_ns() - 65
72+
s._profiled_intervals = 65
73+
s.periodic()
74+
assert s._profiled_intervals == 0
75+
assert s.interval == 1
76+
mock_periodic.assert_called()

0 commit comments

Comments
 (0)