Skip to content

Commit 98c3dd3

Browse files
committed
chore(telemetry): start telemetry as early as possible and delay sending events until app-started
1 parent 678abc9 commit 98c3dd3

File tree

8 files changed

+53
-72
lines changed

8 files changed

+53
-72
lines changed

ddtrace/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525

2626

2727
telemetry.install_excepthook()
28+
# In order to support 3.12, we start the writer upon initialization.
29+
# See https://github.com/python/cpython/pull/104826.
30+
# Telemetry events will only be sent after the `app-started` is queued.
31+
# This will occur when the agent writer starts.
32+
telemetry.telemetry_writer.enable()
2833

2934
from ._monkey import patch # noqa: E402
3035
from ._monkey import patch_all # noqa: E402

ddtrace/internal/telemetry/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ def _excepthook(tp, value, root_traceback):
4949
error_msg = "{}:{} {}".format(filename, lineno, str(value))
5050
telemetry_writer.add_integration(integration_name, True, error_msg=error_msg)
5151

52+
if telemetry_writer.started is False:
53+
telemetry_writer._app_started_event(False)
54+
telemetry_writer._app_dependencies_loaded_event()
55+
5256
telemetry_writer.app_shutdown()
5357
telemetry_writer.disable()
5458

ddtrace/internal/telemetry/data.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
from typing import List
55
from typing import Tuple
66

7-
import ddtrace
87
from ddtrace.internal.compat import PY3
98
from ddtrace.internal.constants import DEFAULT_SERVICE_NAME
109
from ddtrace.internal.packages import get_distributions
1110
from ddtrace.internal.runtime.container import get_container_info
1211
from ddtrace.internal.utils.cache import cached
12+
from ddtrace.version import get_version
1313

1414
from ...settings import _config as config
1515
from ..hostname import get_hostname
@@ -63,7 +63,7 @@ def _get_application(key):
6363
"env": env or "",
6464
"language_name": "python",
6565
"language_version": _format_version_info(sys.version_info),
66-
"tracer_version": ddtrace.__version__,
66+
"tracer_version": get_version(),
6767
"runtime_name": platform.python_implementation(),
6868
"runtime_version": _format_version_info(sys.implementation.version) if PY3 else "",
6969
"products": _get_products(),
@@ -88,7 +88,7 @@ def get_application(service, version, env):
8888
def _get_products():
8989
# type: () -> Dict
9090
return {
91-
"appsec": {"version": ddtrace.__version__, "enabled": config._appsec_enabled},
91+
"appsec": {"version": get_version(), "enabled": config._appsec_enabled},
9292
}
9393

9494

ddtrace/internal/telemetry/writer.py

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,6 @@ def enable(self):
207207

208208
if self._is_periodic:
209209
self.start()
210-
atexit.register(self.app_shutdown)
211210
return True
212211

213212
self.status = ServiceStatus.RUNNING
@@ -282,14 +281,18 @@ def add_error(self, code, msg, filename, line_number):
282281
msg = "%s:%s: %s" % (filename, line_number, msg)
283282
self._error = (code, msg)
284283

285-
def _app_started_event(self):
286-
# type: () -> None
284+
def _app_started_event(self, register_app_shutdown=True):
285+
# type: (bool) -> None
287286
"""Sent when TelemetryWriter is enabled or forks"""
288287
if self._forked:
289288
# app-started events should only be sent by the main process
290289
return
291290
# List of configurations to be collected
292291

292+
self.started = True
293+
if register_app_shutdown:
294+
atexit.register(self.app_shutdown)
295+
293296
self.add_configurations(
294297
[
295298
(TELEMETRY_TRACING_ENABLED, config._tracing_enabled, "unknown"),
@@ -576,15 +579,6 @@ def periodic(self, force_flush=False):
576579
for telemetry_event in telemetry_events:
577580
self._client.send_event(telemetry_event)
578581

579-
def start(self, *args, **kwargs):
580-
# type: (...) -> None
581-
super(TelemetryWriter, self).start(*args, **kwargs)
582-
# Queue app-started event after the telemetry worker thread is running
583-
if self.started is False:
584-
self._app_started_event()
585-
self._app_dependencies_loaded_event()
586-
self.started = True
587-
588582
def app_shutdown(self):
589583
self._app_closing_event()
590584
self.periodic(force_flush=True)
@@ -613,13 +607,10 @@ def _fork_writer(self):
613607
if self.status == ServiceStatus.STOPPED:
614608
return
615609

616-
atexit.unregister(self.stop)
617-
self.stop(join=False)
618-
619-
# Enable writer service in child process to avoid interpreter shutdown
620-
# error in Python 3.12
621-
if sys.version_info >= (3, 12):
622-
self.enable()
610+
# # Enable writer service in child process to avoid interpreter shutdown
611+
# # error in Python 3.12
612+
# if sys.version_info >= (3, 12):
613+
# self.enable()
623614

624615
def _restart_sequence(self):
625616
self._sequence = itertools.count(1)

ddtrace/internal/writer/writer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,8 @@ def _send_payload(self, payload, count, client):
638638
def start(self):
639639
super(AgentWriter, self).start()
640640
try:
641-
telemetry_writer.enable()
641+
telemetry_writer._app_started_event()
642+
telemetry_writer._app_dependencies_loaded_event()
642643

643644
# appsec remote config should be enabled/started after the global tracer and configs
644645
# are initialized

tests/telemetry/test_telemetry.py

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@
88
def test_enable(test_agent_session, run_python_code_in_subprocess):
99
code = """
1010
from ddtrace.internal.telemetry import telemetry_writer
11+
from ddtrace.internal.service import ServiceStatus
12+
1113
telemetry_writer.enable()
14+
15+
assert telemetry_writer.status == ServiceStatus.RUNNING
16+
assert telemetry_writer._worker is not None
1217
"""
1318

1419
stdout, stderr, status, _ = run_python_code_in_subprocess(code)
@@ -17,26 +22,10 @@ def test_enable(test_agent_session, run_python_code_in_subprocess):
1722
assert stdout == b"", stderr
1823
assert stderr == b""
1924

20-
events = test_agent_session.get_events()
21-
assert len(events) == 3
22-
23-
# Same runtime id is used
24-
assert events[0]["runtime_id"] == events[1]["runtime_id"]
25-
assert events[0]["request_type"] == "app-closing"
26-
assert events[1]["request_type"] == "app-dependencies-loaded"
27-
assert events[2]["request_type"] == "app-started"
28-
assert events[2]["payload"]["error"] == {"code": 0, "message": ""}
29-
3025

3126
@pytest.mark.snapshot
3227
def test_telemetry_enabled_on_first_tracer_flush(test_agent_session, ddtrace_run_python_code_in_subprocess):
3328
"""assert telemetry events are generated after the first trace is flushed to the agent"""
34-
# Using ddtrace-run and/or importing ddtrace alone should not enable telemetry
35-
# Telemetry data should only be sent after the first trace to the agent
36-
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import ddtrace")
37-
assert status == 0, stderr
38-
# No trace and No Telemetry
39-
assert len(test_agent_session.get_events()) == 0
4029

4130
# Submit a trace to the agent in a subprocess
4231
code = 'from ddtrace import tracer; span = tracer.trace("test-telemetry"); span.finish()'
@@ -65,6 +54,7 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess):
6554
6655
# We have to start before forking since fork hooks are not enabled until after enabling
6756
telemetry_writer.enable()
57+
telemetry_writer._app_started_event()
6858
6959
if os.fork() == 0:
7060
# Send multiple started events to confirm none get sent
@@ -85,15 +75,12 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess):
8575
requests = test_agent_session.get_requests()
8676

8777
# We expect 2 events from the parent process to get sent, but none from the child process
88-
assert len(requests) == 3
78+
assert len(requests) == 2
8979
# Validate that the runtime id sent for every event is the parent processes runtime id
9080
assert requests[0]["body"]["runtime_id"] == runtime_id
9181
assert requests[0]["body"]["request_type"] == "app-closing"
9282
assert requests[1]["body"]["runtime_id"] == runtime_id
93-
assert requests[1]["body"]["request_type"] == "app-dependencies-loaded"
94-
assert requests[1]["body"]["runtime_id"] == runtime_id
95-
assert requests[2]["body"]["request_type"] == "app-started"
96-
assert requests[2]["body"]["runtime_id"] == runtime_id
83+
assert requests[1]["body"]["request_type"] == "app-started"
9784

9885

9986
def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess):
@@ -250,6 +237,9 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
250237
251238
from ddtrace import patch, tracer
252239
patch(raise_errors=False, sqlite3=True)
240+
241+
# Create a span to start the telemetry writer
242+
tracer.trace("hi").finish()
253243
"""
254244

255245
_, stderr, status, _ = run_python_code_in_subprocess(code)
@@ -260,15 +250,11 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
260250

261251
events = test_agent_session.get_events()
262252

263-
assert len(events) == 5
264-
# Same runtime id is used
265-
assert (
266-
events[0]["runtime_id"]
267-
== events[1]["runtime_id"]
268-
== events[2]["runtime_id"]
269-
== events[3]["runtime_id"]
270-
== events[4]["runtime_id"]
271-
)
253+
assert len(events) > 1
254+
for event in events:
255+
# Same runtime id is used
256+
assert event["runtime_id"] == events[0]["runtime_id"]
257+
272258
integrations_events = [event for event in events if event["request_type"] == "app-integrations-change"]
273259

274260
assert len(integrations_events) == 1
@@ -277,12 +263,14 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
277263
== "failed to import ddtrace module 'ddtrace.contrib.sqlite3' when patching on import"
278264
)
279265

280-
metric_events = [event for event in events if event["request_type"] == "generate-metrics"]
281-
266+
metric_events = [
267+
event
268+
for event in events
269+
if event["request_type"] == "generate-metrics"
270+
and event["payload"]["series"][0]["metric"] == "integration_errors"
271+
]
282272
assert len(metric_events) == 1
283-
assert metric_events[0]["payload"]["namespace"] == "tracers"
284273
assert len(metric_events[0]["payload"]["series"]) == 1
285-
assert metric_events[0]["payload"]["series"][0]["metric"] == "integration_errors"
286274
assert metric_events[0]["payload"]["series"][0]["type"] == "count"
287275
assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1
288276
assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1

tests/telemetry/test_telemetry_metrics_e2e.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import os
44
import subprocess
55
import sys
6-
import time
76

87
import pytest
98

@@ -28,8 +27,6 @@ def _build_env():
2827
def gunicorn_server(telemetry_metrics_enabled="true", token=None):
2928
cmd = ["ddtrace-run", "gunicorn", "-w", "1", "-b", "0.0.0.0:8000", "tests.telemetry.app:app"]
3029
env = _build_env()
31-
env["DD_TELEMETRY_METRICS_ENABLED"] = telemetry_metrics_enabled
32-
env["DD_TELEMETRY_HEARTBEAT_INTERVAL"] = "1.0"
3330
env["_DD_TRACE_WRITER_ADDITIONAL_HEADERS"] = "X-Datadog-Test-Session-Token:{}".format(token)
3431
env["DD_TRACE_AGENT_URL"] = os.environ.get("DD_TRACE_AGENT_URL", "")
3532
env["DD_TRACE_DEBUG"] = "true"
@@ -90,19 +87,15 @@ def test_telemetry_metrics_enabled_on_gunicorn_child_process(test_agent_session)
9087
gunicorn_client.get("/count_metric")
9188
response = gunicorn_client.get("/count_metric")
9289
assert response.status_code == 200
93-
# DD_TELEMETRY_HEARTBEAT_INTERVAL is set to 1 second
94-
time.sleep(1)
9590
gunicorn_client.get("/count_metric")
9691
response = gunicorn_client.get("/count_metric")
9792
assert response.status_code == 200
9893

9994
events = test_agent_session.get_events()
10095
metrics = list(filter(lambda event: event["request_type"] == "generate-metrics", events))
101-
assert len(metrics) == 2
96+
assert len(metrics) == 1
10297
assert metrics[0]["payload"]["series"][0]["metric"] == "test_metric"
103-
assert metrics[0]["payload"]["series"][0]["points"][0][1] == 2.0
104-
assert metrics[1]["payload"]["series"][0]["metric"] == "test_metric"
105-
assert metrics[1]["payload"]["series"][0]["points"][0][1] == 3.0
98+
assert metrics[0]["payload"]["series"][0]["points"][0][1] == 5
10699

107100

108101
def test_span_creation_and_finished_metrics_datadog(test_agent_session, ddtrace_run_python_code_in_subprocess):

tests/telemetry/test_writer.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,8 @@ def test_send_failing_request(mock_status, telemetry_writer):
350350
with httpretty.enabled():
351351
httpretty.register_uri(httpretty.POST, telemetry_writer._client.url, status=mock_status)
352352
with mock.patch("ddtrace.internal.telemetry.writer.log") as log:
353-
# sends failing app-closing event
354-
telemetry_writer.app_shutdown()
353+
# sends failing app-heartbeat event
354+
telemetry_writer.periodic()
355355
# asserts unsuccessful status code was logged
356356
log.debug.assert_called_with(
357357
"failed to send telemetry to the Datadog Agent at %s. response: %s",
@@ -370,13 +370,11 @@ def test_telemetry_graceful_shutdown(telemetry_writer, test_agent_session, mock_
370370
telemetry_writer.app_shutdown()
371371

372372
events = test_agent_session.get_events()
373-
assert len(events) == 3
373+
assert len(events) == 1
374374

375375
# Reverse chronological order
376376
assert events[0]["request_type"] == "app-closing"
377-
assert events[0] == _get_request_body({}, "app-closing", 3)
378-
assert events[1]["request_type"] == "app-dependencies-loaded"
379-
assert events[2]["request_type"] == "app-started"
377+
assert events[0] == _get_request_body({}, "app-closing", 1)
380378

381379

382380
def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_session):
@@ -385,6 +383,7 @@ def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_se
385383

386384
# Ensure telemetry writer is initialized to send periodic events
387385
telemetry_writer._is_periodic = True
386+
telemetry_writer.started = True
388387
# Assert default telemetry interval is 10 seconds and the expected periodic threshold and counts are set
389388
assert telemetry_writer.interval == 10
390389
assert telemetry_writer._periodic_threshold == 5

0 commit comments

Comments
 (0)