diff --git a/.circleci/config.yml b/.circleci/config.yml index 77e7357a30f..1c910518daa 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -38,4 +38,4 @@ jobs: workflows: setup: jobs: - - setup + - setup \ No newline at end of file diff --git a/ddtrace/__init__.py b/ddtrace/__init__.py index df81fc4e37f..2f96bed64d2 100644 --- a/ddtrace/__init__.py +++ b/ddtrace/__init__.py @@ -25,6 +25,11 @@ telemetry.install_excepthook() +# In order to support 3.12, we start the writer upon initialization. +# See https://github.com/python/cpython/pull/104826. +# Telemetry events will only be sent after the `app-started` is queued. +# This will occur when the agent writer starts. +telemetry.telemetry_writer.enable() from ._monkey import patch # noqa: E402 from ._monkey import patch_all # noqa: E402 diff --git a/ddtrace/internal/telemetry/__init__.py b/ddtrace/internal/telemetry/__init__.py index e178bb874f4..e8d4395a90c 100644 --- a/ddtrace/internal/telemetry/__init__.py +++ b/ddtrace/internal/telemetry/__init__.py @@ -49,6 +49,13 @@ def _excepthook(tp, value, root_traceback): error_msg = "{}:{} {}".format(filename, lineno, str(value)) telemetry_writer.add_integration(integration_name, True, error_msg=error_msg) + if telemetry_writer.started is False: + telemetry_writer._app_started_event(False) + telemetry_writer._app_dependencies_loaded_event() + + telemetry_writer.app_shutdown() + telemetry_writer.disable() + return _ORIGINAL_EXCEPTHOOK(tp, value, root_traceback) diff --git a/ddtrace/internal/telemetry/data.py b/ddtrace/internal/telemetry/data.py index 2afe96bc5f3..bd334c7bc29 100644 --- a/ddtrace/internal/telemetry/data.py +++ b/ddtrace/internal/telemetry/data.py @@ -4,12 +4,12 @@ from typing import List from typing import Tuple -import ddtrace from ddtrace.internal.compat import PY3 from ddtrace.internal.constants import DEFAULT_SERVICE_NAME from ddtrace.internal.packages import get_distributions from ddtrace.internal.runtime.container import get_container_info from ddtrace.internal.utils.cache import cached +from ddtrace.version import get_version from ...settings import _config as config from ..hostname import get_hostname @@ -63,7 +63,7 @@ def _get_application(key): "env": env or "", "language_name": "python", "language_version": _format_version_info(sys.version_info), - "tracer_version": ddtrace.__version__, + "tracer_version": get_version(), "runtime_name": platform.python_implementation(), "runtime_version": _format_version_info(sys.implementation.version) if PY3 else "", "products": _get_products(), @@ -88,7 +88,7 @@ def get_application(service, version, env): def _get_products(): # type: () -> Dict return { - "appsec": {"version": ddtrace.__version__, "enabled": config._appsec_enabled}, + "appsec": {"version": get_version(), "enabled": config._appsec_enabled}, } diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index d2979c0a818..e295055b53d 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -207,7 +207,6 @@ def enable(self): if self._is_periodic: self.start() - atexit.register(self.app_shutdown) return True self.status = ServiceStatus.RUNNING @@ -281,14 +280,18 @@ def add_error(self, code, msg, filename, line_number): msg = "%s:%s: %s" % (filename, line_number, msg) self._error = (code, msg) - def _app_started_event(self): - # type: () -> None + def _app_started_event(self, register_app_shutdown=True): + # type: (bool) -> None """Sent when TelemetryWriter is enabled or forks""" if self._forked: # app-started events should only be sent by the main process return # List of configurations to be collected + self.started = True + if register_app_shutdown: + atexit.register(self.app_shutdown) + self.add_configurations( [ (TELEMETRY_TRACING_ENABLED, config._tracing_enabled, "unknown"), @@ -575,15 +578,6 @@ def periodic(self, force_flush=False): for telemetry_event in telemetry_events: self._client.send_event(telemetry_event) - def start(self, *args, **kwargs): - # type: (...) -> None - super(TelemetryWriter, self).start(*args, **kwargs) - # Queue app-started event after the telemetry worker thread is running - if self.started is False: - self._app_started_event() - self._app_dependencies_loaded_event() - self.started = True - def app_shutdown(self): self._app_closing_event() self.periodic(force_flush=True) @@ -616,8 +610,7 @@ def _fork_writer(self): # Enable writer service in child process to avoid interpreter shutdown # error in Python 3.12 - if sys.version_info >= (3, 12): - self.enable() + self.enable() def _restart_sequence(self): self._sequence = itertools.count(1) diff --git a/ddtrace/internal/writer/writer.py b/ddtrace/internal/writer/writer.py index a5e7696e043..109379f3e5e 100644 --- a/ddtrace/internal/writer/writer.py +++ b/ddtrace/internal/writer/writer.py @@ -626,7 +626,8 @@ def _send_payload(self, payload, count, client): def start(self): super(AgentWriter, self).start() try: - telemetry_writer.enable() + telemetry_writer._app_started_event() + telemetry_writer._app_dependencies_loaded_event() # appsec remote config should be enabled/started after the global tracer and configs # are initialized diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index 8e3aa8bea77..bc7ee6b2b7b 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -8,7 +8,12 @@ def test_enable(test_agent_session, run_python_code_in_subprocess): code = """ from ddtrace.internal.telemetry import telemetry_writer +from ddtrace.internal.service import ServiceStatus + telemetry_writer.enable() + +assert telemetry_writer.status == ServiceStatus.RUNNING +assert telemetry_writer._worker is not None """ stdout, stderr, status, _ = run_python_code_in_subprocess(code) @@ -17,26 +22,10 @@ def test_enable(test_agent_session, run_python_code_in_subprocess): assert stdout == b"", stderr assert stderr == b"" - events = test_agent_session.get_events() - assert len(events) == 3 - - # Same runtime id is used - assert events[0]["runtime_id"] == events[1]["runtime_id"] - assert events[0]["request_type"] == "app-closing" - assert events[1]["request_type"] == "app-dependencies-loaded" - assert events[2]["request_type"] == "app-started" - assert events[2]["payload"]["error"] == {"code": 0, "message": ""} - @pytest.mark.snapshot def test_telemetry_enabled_on_first_tracer_flush(test_agent_session, ddtrace_run_python_code_in_subprocess): """assert telemetry events are generated after the first trace is flushed to the agent""" - # Using ddtrace-run and/or importing ddtrace alone should not enable telemetry - # Telemetry data should only be sent after the first trace to the agent - _, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import ddtrace") - assert status == 0, stderr - # No trace and No Telemetry - assert len(test_agent_session.get_events()) == 0 # Submit a trace to the agent in a subprocess code = 'from ddtrace import tracer; span = tracer.trace("test-telemetry"); span.finish()' @@ -58,6 +47,11 @@ def test_telemetry_enabled_on_first_tracer_flush(test_agent_session, ddtrace_run def test_enable_fork(test_agent_session, run_python_code_in_subprocess): """assert app-started/app-closing events are only sent in parent process""" code = """ +import warnings +# This test logs the following warning in py3.12: +# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child +warnings.filterwarnings("ignore", category=DeprecationWarning) + import os from ddtrace.internal.runtime import get_runtime_id @@ -65,6 +59,7 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess): # We have to start before forking since fork hooks are not enabled until after enabling telemetry_writer.enable() +telemetry_writer._app_started_event() if os.fork() == 0: # Send multiple started events to confirm none get sent @@ -78,27 +73,29 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess): stdout, stderr, status, _ = run_python_code_in_subprocess(code) assert status == 0, stderr - assert stderr == b"" + assert stderr == b"", stderr runtime_id = stdout.strip().decode("utf-8") requests = test_agent_session.get_requests() # We expect 2 events from the parent process to get sent, but none from the child process - assert len(requests) == 3 + assert len(requests) == 2 # Validate that the runtime id sent for every event is the parent processes runtime id assert requests[0]["body"]["runtime_id"] == runtime_id assert requests[0]["body"]["request_type"] == "app-closing" assert requests[1]["body"]["runtime_id"] == runtime_id - assert requests[1]["body"]["request_type"] == "app-dependencies-loaded" - assert requests[1]["body"]["runtime_id"] == runtime_id - assert requests[2]["body"]["request_type"] == "app-started" - assert requests[2]["body"]["runtime_id"] == runtime_id + assert requests[1]["body"]["request_type"] == "app-started" def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess): """assert app-heartbeat events are only sent in parent process when no other events are queued""" code = """ +import warnings +# This test logs the following warning in py3.12: +# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child +warnings.filterwarnings("ignore", category=DeprecationWarning) + import os from ddtrace.internal.runtime import get_runtime_id @@ -120,7 +117,7 @@ def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess stdout, stderr, status, _ = run_python_code_in_subprocess(code) assert status == 0, stderr - assert stderr == b"" + assert stderr == b"", stderr runtime_id = stdout.strip().decode("utf-8") @@ -138,6 +135,11 @@ def test_heartbeat_interval_configuration(run_python_code_in_subprocess): env = os.environ.copy() env["DD_TELEMETRY_HEARTBEAT_INTERVAL"] = "61" code = """ +import warnings +# This test logs the following warning in py3.12: +# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child +warnings.filterwarnings("ignore", category=DeprecationWarning) + from ddtrace import config assert config._telemetry_heartbeat_interval == 61 @@ -156,6 +158,11 @@ def test_logs_after_fork(run_python_code_in_subprocess): # Regression test: telemetry writer should not log an error when a process forks _, err, status, _ = run_python_code_in_subprocess( """ +import warnings +# This test logs the following warning in py3.12: +# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child +warnings.filterwarnings("ignore", category=DeprecationWarning) + import ddtrace import logging import os @@ -167,7 +174,7 @@ def test_logs_after_fork(run_python_code_in_subprocess): ) assert status == 0, err - assert err == b"" + assert err == b"", err def test_app_started_error_handled_exception(test_agent_session, run_python_code_in_subprocess): @@ -250,6 +257,9 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro from ddtrace import patch, tracer patch(raise_errors=False, sqlite3=True) + +# Create a span to start the telemetry writer +tracer.trace("hi").finish() """ _, stderr, status, _ = run_python_code_in_subprocess(code) @@ -260,15 +270,11 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro events = test_agent_session.get_events() - assert len(events) == 5 - # Same runtime id is used - assert ( - events[0]["runtime_id"] - == events[1]["runtime_id"] - == events[2]["runtime_id"] - == events[3]["runtime_id"] - == events[4]["runtime_id"] - ) + assert len(events) > 1 + for event in events: + # Same runtime id is used + assert event["runtime_id"] == events[0]["runtime_id"] + integrations_events = [event for event in events if event["request_type"] == "app-integrations-change"] assert len(integrations_events) == 1 @@ -277,12 +283,14 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro == "failed to import ddtrace module 'ddtrace.contrib.sqlite3' when patching on import" ) - metric_events = [event for event in events if event["request_type"] == "generate-metrics"] - + metric_events = [ + event + for event in events + if event["request_type"] == "generate-metrics" + and event["payload"]["series"][0]["metric"] == "integration_errors" + ] assert len(metric_events) == 1 - assert metric_events[0]["payload"]["namespace"] == "tracers" assert len(metric_events[0]["payload"]["series"]) == 1 - assert metric_events[0]["payload"]["series"][0]["metric"] == "integration_errors" assert metric_events[0]["payload"]["series"][0]["type"] == "count" assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1 assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1 diff --git a/tests/telemetry/test_telemetry_metrics_e2e.py b/tests/telemetry/test_telemetry_metrics_e2e.py index 6755e387255..55f62589247 100644 --- a/tests/telemetry/test_telemetry_metrics_e2e.py +++ b/tests/telemetry/test_telemetry_metrics_e2e.py @@ -3,7 +3,6 @@ import os import subprocess import sys -import time import pytest @@ -28,8 +27,6 @@ def _build_env(): def gunicorn_server(telemetry_metrics_enabled="true", token=None): cmd = ["ddtrace-run", "gunicorn", "-w", "1", "-b", "0.0.0.0:8000", "tests.telemetry.app:app"] env = _build_env() - env["DD_TELEMETRY_METRICS_ENABLED"] = telemetry_metrics_enabled - env["DD_TELEMETRY_HEARTBEAT_INTERVAL"] = "1.0" env["_DD_TRACE_WRITER_ADDITIONAL_HEADERS"] = "X-Datadog-Test-Session-Token:{}".format(token) env["DD_TRACE_AGENT_URL"] = os.environ.get("DD_TRACE_AGENT_URL", "") env["DD_TRACE_DEBUG"] = "true" @@ -90,19 +87,15 @@ def test_telemetry_metrics_enabled_on_gunicorn_child_process(test_agent_session) gunicorn_client.get("/count_metric") response = gunicorn_client.get("/count_metric") assert response.status_code == 200 - # DD_TELEMETRY_HEARTBEAT_INTERVAL is set to 1 second - time.sleep(1) gunicorn_client.get("/count_metric") response = gunicorn_client.get("/count_metric") assert response.status_code == 200 events = test_agent_session.get_events() metrics = list(filter(lambda event: event["request_type"] == "generate-metrics", events)) - assert len(metrics) == 2 + assert len(metrics) == 1 assert metrics[0]["payload"]["series"][0]["metric"] == "test_metric" - assert metrics[0]["payload"]["series"][0]["points"][0][1] == 2.0 - assert metrics[1]["payload"]["series"][0]["metric"] == "test_metric" - assert metrics[1]["payload"]["series"][0]["points"][0][1] == 3.0 + assert metrics[0]["payload"]["series"][0]["points"][0][1] == 5 def test_span_creation_and_finished_metrics_datadog(test_agent_session, ddtrace_run_python_code_in_subprocess): diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index dd712ee771f..3a82e212d29 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -350,8 +350,8 @@ def test_send_failing_request(mock_status, telemetry_writer): with httpretty.enabled(): httpretty.register_uri(httpretty.POST, telemetry_writer._client.url, status=mock_status) with mock.patch("ddtrace.internal.telemetry.writer.log") as log: - # sends failing app-closing event - telemetry_writer.app_shutdown() + # sends failing app-heartbeat event + telemetry_writer.periodic() # asserts unsuccessful status code was logged log.debug.assert_called_with( "failed to send telemetry to the Datadog Agent at %s. response: %s", @@ -370,13 +370,11 @@ def test_telemetry_graceful_shutdown(telemetry_writer, test_agent_session, mock_ telemetry_writer.app_shutdown() events = test_agent_session.get_events() - assert len(events) == 3 + assert len(events) == 1 # Reverse chronological order assert events[0]["request_type"] == "app-closing" - assert events[0] == _get_request_body({}, "app-closing", 3) - assert events[1]["request_type"] == "app-dependencies-loaded" - assert events[2]["request_type"] == "app-started" + assert events[0] == _get_request_body({}, "app-closing", 1) def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_session): @@ -385,6 +383,7 @@ def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_se # Ensure telemetry writer is initialized to send periodic events telemetry_writer._is_periodic = True + telemetry_writer.started = True # Assert default telemetry interval is 10 seconds and the expected periodic threshold and counts are set assert telemetry_writer.interval == 10 assert telemetry_writer._periodic_threshold == 5