Skip to content

Commit 83ce6ce

Browse files
authored
feat(profiling): add serverless scheduler and conditionally use it (#3870)
Based on Julien's feedback in our RFC, I've added a small scheduler which flushes after the periodic function is called 60 times. This ensures that profiles are sampled over the course of 60s of runtime instead of wallclock time.
1 parent 4defacb commit 83ce6ce

File tree

6 files changed

+83
-4
lines changed

6 files changed

+83
-4
lines changed

ddtrace/profiling/profiler.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,14 @@ class _ProfilerInstance(service.Service):
131131

132132
_recorder = attr.ib(init=False, default=None)
133133
_collectors = attr.ib(init=False, default=None)
134-
_scheduler = attr.ib(init=False, default=None)
134+
_scheduler = attr.ib(
135+
init=False,
136+
default=None,
137+
type=scheduler.Scheduler,
138+
)
139+
_lambda_function_name = attr.ib(
140+
init=False, factory=lambda: os.environ.get("AWS_LAMBDA_FUNCTION_NAME"), type=Optional[str]
141+
)
135142

136143
ENDPOINT_TEMPLATE = "https://intake.profile.{}"
137144

@@ -165,6 +172,9 @@ def _build_default_exporters(self):
165172
# to the agent base path.
166173
endpoint_path = "profiling/v1/input"
167174

175+
if self._lambda_function_name is not None:
176+
self.tags.update({"functionname": self._lambda_function_name.encode("utf-8")})
177+
168178
return [
169179
http.PprofHTTPExporter(
170180
service=self.service,
@@ -209,9 +219,11 @@ def __attrs_post_init__(self):
209219
exporters = self._build_default_exporters()
210220

211221
if exporters:
212-
self._scheduler = scheduler.Scheduler(
213-
recorder=r, exporters=exporters, before_flush=self._collectors_snapshot
214-
)
222+
if self._lambda_function_name is None:
223+
scheduler_class = scheduler.Scheduler
224+
else:
225+
scheduler_class = scheduler.ServerlessScheduler
226+
self._scheduler = scheduler_class(recorder=r, exporters=exporters, before_flush=self._collectors_snapshot)
215227

216228
self.set_asyncio_event_loop_policy()
217229

ddtrace/profiling/scheduler.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,35 @@ def periodic(self):
6565
self.flush()
6666
finally:
6767
self.interval = max(0, self._configured_interval - (compat.monotonic() - start_time))
68+
69+
70+
@attr.s
71+
class ServerlessScheduler(Scheduler):
72+
"""Serverless scheduler that works on, e.g., AWS Lambda.
73+
74+
The idea with this scheduler is to not sleep 60s, but to sleep 1s and flush out profiles after 60 sleeping period.
75+
As the service can be frozen a few seconds after flushing out a profile, we want to make sure the next flush is not
76+
> 60s later, but after at least 60 periods of 1s.
77+
78+
"""
79+
80+
# We force this interval everywhere
81+
FORCED_INTERVAL = 1.0
82+
FLUSH_AFTER_INTERVALS = 60.0
83+
84+
_interval = attr.ib(default=FORCED_INTERVAL, type=float)
85+
_profiled_intervals = attr.ib(init=False, default=0)
86+
87+
def periodic(self):
88+
# Check both the number of intervals and time frame to be sure we don't flush, e.g., empty profiles
89+
if self._profiled_intervals >= self.FLUSH_AFTER_INTERVALS and (compat.time_ns() - self._last_export) >= (
90+
self.FORCED_INTERVAL * self.FLUSH_AFTER_INTERVALS
91+
):
92+
try:
93+
super(ServerlessScheduler, self).periodic()
94+
finally:
95+
# Override interval so it's always back to the value we n
96+
self.interval = self.FORCED_INTERVAL
97+
self._profiled_intervals = 0
98+
else:
99+
self._profiled_intervals += 1

docs/spelling_wordlist.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,5 @@ yaaredis
186186
Kinesis
187187
AppSec
188188
libddwaf
189+
Serverless
190+
serverless
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
features:
3+
- |
4+
Adds support for Lambda profiling, which can be enabled by starting the profiler outside of the handler (on cold start).

tests/profiling/test_profiler.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from ddtrace.profiling import event
1010
from ddtrace.profiling import exporter
1111
from ddtrace.profiling import profiler
12+
from ddtrace.profiling import scheduler
1213
from ddtrace.profiling.collector import asyncio
1314
from ddtrace.profiling.collector import memalloc
1415
from ddtrace.profiling.collector import stack
@@ -378,3 +379,11 @@ def test_default_collectors():
378379
else:
379380
assert any(isinstance(c, asyncio.AsyncioLockCollector) for c in p._profiler._collectors)
380381
p.stop(flush=False)
382+
383+
384+
def test_profiler_serverless(monkeypatch):
385+
# type: (...) -> None
386+
monkeypatch.setenv("AWS_LAMBDA_FUNCTION_NAME", "foobar")
387+
p = profiler.Profiler()
388+
assert isinstance(p._scheduler, scheduler.ServerlessScheduler)
389+
assert p.tags["functionname"] == b"foobar"

tests/profiling/test_scheduler.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
# -*- encoding: utf-8 -*-
22
import logging
33

4+
import mock
5+
6+
from ddtrace.internal import compat
47
from ddtrace.profiling import event
58
from ddtrace.profiling import exporter
69
from ddtrace.profiling import recorder
@@ -54,3 +57,20 @@ def call_me():
5457
assert caplog.record_tuples == [
5558
(("ddtrace.profiling.scheduler", logging.ERROR, "Scheduler before_flush hook failed"))
5659
]
60+
61+
62+
@mock.patch("ddtrace.profiling.scheduler.Scheduler.periodic")
63+
def test_serverless_periodic(mock_periodic):
64+
r = recorder.Recorder()
65+
s = scheduler.ServerlessScheduler(r, [exporter.NullExporter()])
66+
# Fake start()
67+
s._last_export = compat.time_ns()
68+
s.periodic()
69+
assert s._profiled_intervals == 1
70+
mock_periodic.assert_not_called()
71+
s._last_export = compat.time_ns() - 65
72+
s._profiled_intervals = 65
73+
s.periodic()
74+
assert s._profiled_intervals == 0
75+
assert s.interval == 1
76+
mock_periodic.assert_called()

0 commit comments

Comments
 (0)