diff --git a/sentry_sdk/integrations/asyncpg.py b/sentry_sdk/integrations/asyncpg.py index b05d5615ba..71740cb3aa 100644 --- a/sentry_sdk/integrations/asyncpg.py +++ b/sentry_sdk/integrations/asyncpg.py @@ -40,7 +40,6 @@ def setup_once() -> None: asyncpg.Connection.execute = _wrap_execute( asyncpg.Connection.execute, ) - asyncpg.Connection._execute = _wrap_connection_method( asyncpg.Connection._execute ) @@ -80,8 +79,8 @@ async def _inner(*args: Any, **kwargs: Any) -> T: ) as span: res = await f(*args, **kwargs) - with capture_internal_exceptions(): - add_query_source(span) + with capture_internal_exceptions(): + add_query_source(span) return res @@ -148,7 +147,7 @@ def _inner(*args: Any, **kwargs: Any) -> T: # noqa: N807 ) as span: _set_db_data(span, args[0]) res = f(*args, **kwargs) - span.set_data("db.cursor", res) + span.set_attribute("db.cursor", str(res)) return res @@ -168,21 +167,37 @@ async def _inner(*args: Any, **kwargs: Any) -> T: name="connect", origin=AsyncPGIntegration.origin, ) as span: - span.set_data(SPANDATA.DB_SYSTEM, "postgresql") + span.set_attribute(SPANDATA.DB_SYSTEM, "postgresql") addr = kwargs.get("addr") if addr: try: - span.set_data(SPANDATA.SERVER_ADDRESS, addr[0]) - span.set_data(SPANDATA.SERVER_PORT, addr[1]) + span.set_attribute(SPANDATA.SERVER_ADDRESS, addr[0]) + span.set_attribute(SPANDATA.SERVER_PORT, addr[1]) except IndexError: pass - span.set_data(SPANDATA.DB_NAME, database) - span.set_data(SPANDATA.DB_USER, user) + + span.set_attribute(SPANDATA.DB_NAME, database) + span.set_attribute(SPANDATA.DB_USER, user) with capture_internal_exceptions(): + data = {} + for attr in ( + "db.cursor", + "db.params", + "db.paramstyle", + SPANDATA.DB_NAME, + SPANDATA.DB_SYSTEM, + SPANDATA.DB_USER, + SPANDATA.SERVER_ADDRESS, + SPANDATA.SERVER_PORT, + ): + if span.get_attribute(attr): + data[attr] = span.get_attribute(attr) + sentry_sdk.add_breadcrumb( - message="connect", category="query", data=span._data + message="connect", category="query", data=data ) + res = await f(*args, **kwargs) return res @@ -191,20 +206,20 @@ async def _inner(*args: Any, **kwargs: Any) -> T: def _set_db_data(span: Span, conn: Any) -> None: - span.set_data(SPANDATA.DB_SYSTEM, "postgresql") + span.set_attribute(SPANDATA.DB_SYSTEM, "postgresql") addr = conn._addr if addr: try: - span.set_data(SPANDATA.SERVER_ADDRESS, addr[0]) - span.set_data(SPANDATA.SERVER_PORT, addr[1]) + span.set_attribute(SPANDATA.SERVER_ADDRESS, addr[0]) + span.set_attribute(SPANDATA.SERVER_PORT, addr[1]) except IndexError: pass database = conn._params.database if database: - span.set_data(SPANDATA.DB_NAME, database) + span.set_attribute(SPANDATA.DB_NAME, database) user = conn._params.user if user: - span.set_data(SPANDATA.DB_USER, user) + span.set_attribute(SPANDATA.DB_USER, user) diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index 2f4dad738a..b8f7288374 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -4,7 +4,7 @@ import re import sys from collections.abc import Mapping -from datetime import timedelta +from datetime import datetime, timedelta, timezone from functools import wraps from urllib.parse import quote, unquote import uuid @@ -133,13 +133,13 @@ def record_sql_queries( data = {} if params_list is not None: - data["db.params"] = params_list + data["db.params"] = str(params_list) if paramstyle is not None: - data["db.paramstyle"] = paramstyle + data["db.paramstyle"] = str(paramstyle) if executemany: data["db.executemany"] = True if record_cursor_repr and cursor is not None: - data["db.cursor"] = cursor + data["db.cursor"] = str(cursor) with capture_internal_exceptions(): sentry_sdk.add_breadcrumb(message=query, category="query", data=data) @@ -209,14 +209,17 @@ def add_query_source(span): if not client.is_active(): return - if span.timestamp is None or span.start_timestamp is None: + if span.start_timestamp is None: return should_add_query_source = client.options.get("enable_db_query_source", True) if not should_add_query_source: return - duration = span.timestamp - span.start_timestamp + # We assume here that the query is just ending now. We can't use + # the actual end timestamp of the span because in OTel the span + # can't be finished in order to set any attributes on it. + duration = datetime.now(tz=timezone.utc) - span.start_timestamp threshold = client.options.get("db_query_source_threshold_ms", 0) slow_query = duration / timedelta(milliseconds=1) > threshold diff --git a/tests/integrations/asyncpg/test_asyncpg.py b/tests/integrations/asyncpg/test_asyncpg.py index 8996c8dd1a..adeef37d38 100644 --- a/tests/integrations/asyncpg/test_asyncpg.py +++ b/tests/integrations/asyncpg/test_asyncpg.py @@ -10,14 +10,6 @@ """ import os - - -PG_HOST = os.getenv("SENTRY_PYTHON_TEST_POSTGRES_HOST", "localhost") -PG_PORT = int(os.getenv("SENTRY_PYTHON_TEST_POSTGRES_PORT", "5432")) -PG_USER = os.getenv("SENTRY_PYTHON_TEST_POSTGRES_USER", "postgres") -PG_PASSWORD = os.getenv("SENTRY_PYTHON_TEST_POSTGRES_PASSWORD", "sentry") -PG_NAME = os.getenv("SENTRY_PYTHON_TEST_POSTGRES_NAME", "postgres") - import datetime from contextlib import contextmanager from unittest import mock @@ -28,16 +20,23 @@ from asyncpg import connect, Connection from freezegun import freeze_time -from sentry_sdk import capture_message, start_transaction +from sentry_sdk import capture_message, start_span from sentry_sdk.integrations.asyncpg import AsyncPGIntegration from sentry_sdk.consts import SPANDATA from sentry_sdk.tracing_utils import record_sql_queries from tests.conftest import ApproxDict +PG_HOST = os.getenv("SENTRY_PYTHON_TEST_POSTGRES_HOST", "localhost") +PG_PORT = int(os.getenv("SENTRY_PYTHON_TEST_POSTGRES_PORT", "5432")) +PG_USER = os.getenv("SENTRY_PYTHON_TEST_POSTGRES_USER", "postgres") +PG_PASSWORD = os.getenv("SENTRY_PYTHON_TEST_POSTGRES_PASSWORD", "sentry") +PG_NAME = os.getenv("SENTRY_PYTHON_TEST_POSTGRES_NAME", "postgres") + PG_CONNECTION_URI = "postgresql://{}:{}@{}/{}".format( PG_USER, PG_PASSWORD, PG_HOST, PG_NAME ) + CRUMBS_CONNECT = { "category": "query", "data": ApproxDict( @@ -75,6 +74,7 @@ async def _clean_pg(): async def test_connect(sentry_init, capture_events) -> None: sentry_init( integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, _experiments={"record_sql_params": True}, ) events = capture_events() @@ -85,7 +85,7 @@ async def test_connect(sentry_init, capture_events) -> None: capture_message("hi") - (event,) = events + event = events[-1] for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -97,6 +97,7 @@ async def test_connect(sentry_init, capture_events) -> None: async def test_execute(sentry_init, capture_events) -> None: sentry_init( integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, _experiments={"record_sql_params": True}, ) events = capture_events() @@ -124,7 +125,7 @@ async def test_execute(sentry_init, capture_events) -> None: capture_message("hi") - (event,) = events + event = events[-1] for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -162,6 +163,7 @@ async def test_execute(sentry_init, capture_events) -> None: async def test_execute_many(sentry_init, capture_events) -> None: sentry_init( integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, _experiments={"record_sql_params": True}, ) events = capture_events() @@ -180,7 +182,7 @@ async def test_execute_many(sentry_init, capture_events) -> None: capture_message("hi") - (event,) = events + event = events[-1] for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -200,6 +202,7 @@ async def test_execute_many(sentry_init, capture_events) -> None: async def test_record_params(sentry_init, capture_events) -> None: sentry_init( integrations=[AsyncPGIntegration(record_params=True)], + traces_sample_rate=1.0, _experiments={"record_sql_params": True}, ) events = capture_events() @@ -217,7 +220,7 @@ async def test_record_params(sentry_init, capture_events) -> None: capture_message("hi") - (event,) = events + event = events[-1] for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -227,7 +230,7 @@ async def test_record_params(sentry_init, capture_events) -> None: { "category": "query", "data": { - "db.params": ["Bob", "secret_pw", "datetime.date(1984, 3, 1)"], + "db.params": "('Bob', 'secret_pw', datetime.date(1984, 3, 1))", "db.paramstyle": "format", }, "message": "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", @@ -240,6 +243,7 @@ async def test_record_params(sentry_init, capture_events) -> None: async def test_cursor(sentry_init, capture_events) -> None: sentry_init( integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, _experiments={"record_sql_params": True}, ) events = capture_events() @@ -260,13 +264,13 @@ async def test_cursor(sentry_init, capture_events) -> None: async for record in conn.cursor( "SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1) ): - print(record) + pass await conn.close() capture_message("hi") - (event,) = events + event = events[-1] for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -279,14 +283,24 @@ async def test_cursor(sentry_init, capture_events) -> None: "message": "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", "type": "default", }, - {"category": "query", "data": {}, "message": "BEGIN;", "type": "default"}, + { + "category": "query", + "data": {}, + "message": "BEGIN;", + "type": "default", + }, { "category": "query", "data": {}, "message": "SELECT * FROM users WHERE dob > $1", "type": "default", }, - {"category": "query", "data": {}, "message": "COMMIT;", "type": "default"}, + { + "category": "query", + "data": {}, + "message": "COMMIT;", + "type": "default", + }, ] @@ -294,6 +308,7 @@ async def test_cursor(sentry_init, capture_events) -> None: async def test_cursor_manual(sentry_init, capture_events) -> None: sentry_init( integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, _experiments={"record_sql_params": True}, ) events = capture_events() @@ -307,24 +322,22 @@ async def test_cursor_manual(sentry_init, capture_events) -> None: ("Alice", "pw", datetime.date(1990, 12, 25)), ], ) - # + async with conn.transaction(): # Postgres requires non-scrollable cursors to be created # and used in a transaction. cur = await conn.cursor( "SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1) ) - record = await cur.fetchrow() - print(record) + await cur.fetchrow() while await cur.forward(1): - record = await cur.fetchrow() - print(record) + await cur.fetchrow() await conn.close() capture_message("hi") - (event,) = events + event = events[-1] for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -337,14 +350,24 @@ async def test_cursor_manual(sentry_init, capture_events) -> None: "message": "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", "type": "default", }, - {"category": "query", "data": {}, "message": "BEGIN;", "type": "default"}, + { + "category": "query", + "data": {}, + "message": "BEGIN;", + "type": "default", + }, { "category": "query", "data": {}, "message": "SELECT * FROM users WHERE dob > $1", "type": "default", }, - {"category": "query", "data": {}, "message": "COMMIT;", "type": "default"}, + { + "category": "query", + "data": {}, + "message": "COMMIT;", + "type": "default", + }, ] @@ -352,6 +375,7 @@ async def test_cursor_manual(sentry_init, capture_events) -> None: async def test_prepared_stmt(sentry_init, capture_events) -> None: sentry_init( integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, _experiments={"record_sql_params": True}, ) events = capture_events() @@ -368,14 +392,14 @@ async def test_prepared_stmt(sentry_init, capture_events) -> None: stmt = await conn.prepare("SELECT * FROM users WHERE name = $1") - print(await stmt.fetchval("Bob")) - print(await stmt.fetchval("Alice")) + await stmt.fetchval("Bob") + await stmt.fetchval("Alice") await conn.close() capture_message("hi") - (event,) = events + event = events[-1] for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -401,6 +425,7 @@ async def test_prepared_stmt(sentry_init, capture_events) -> None: async def test_connection_pool(sentry_init, capture_events) -> None: sentry_init( integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, _experiments={"record_sql_params": True}, ) events = capture_events() @@ -427,7 +452,7 @@ async def test_connection_pool(sentry_init, capture_events) -> None: capture_message("hi") - (event,) = events + event = events[-1] for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -481,7 +506,7 @@ async def test_query_source_disabled(sentry_init, capture_events): events = capture_events() - with start_transaction(name="test_transaction", sampled=True): + with start_span(name="test_span"): conn: Connection = await connect(PG_CONNECTION_URI) await conn.execute( @@ -520,7 +545,7 @@ async def test_query_source_enabled( events = capture_events() - with start_transaction(name="test_transaction", sampled=True): + with start_span(name="test_span"): conn: Connection = await connect(PG_CONNECTION_URI) await conn.execute( @@ -553,7 +578,7 @@ async def test_query_source(sentry_init, capture_events): events = capture_events() - with start_transaction(name="test_transaction", sampled=True): + with start_span(name="test_span"): conn: Connection = await connect(PG_CONNECTION_URI) await conn.execute( @@ -605,7 +630,7 @@ async def test_query_source_with_module_in_search_path(sentry_init, capture_even from asyncpg_helpers.helpers import execute_query_in_connection - with start_transaction(name="test_transaction", sampled=True): + with start_span(name="test_span"): conn: Connection = await connect(PG_CONNECTION_URI) await execute_query_in_connection( @@ -649,27 +674,26 @@ async def test_no_query_source_if_duration_too_short(sentry_init, capture_events events = capture_events() - with start_transaction(name="test_transaction", sampled=True): + with start_span(name="test_span"): conn: Connection = await connect(PG_CONNECTION_URI) @contextmanager def fake_record_sql_queries(*args, **kwargs): - with freeze_time(datetime(2024, 1, 1, microsecond=0)): + with freeze_time(datetime.datetime(2024, 1, 1, microsecond=99999)): with record_sql_queries(*args, **kwargs) as span: - freezer = freeze_time(datetime(2024, 1, 1, microsecond=99999)) - freezer.start() - - freezer.stop() - - yield span + yield span with mock.patch( - "sentry_sdk.integrations.asyncpg.record_sql_queries", - fake_record_sql_queries, + "sentry_sdk.tracing.POTelSpan.start_timestamp", + datetime.datetime(2024, 1, 1, microsecond=0, tzinfo=datetime.timezone.utc), ): - await conn.execute( - "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", - ) + with mock.patch( + "sentry_sdk.integrations.asyncpg.record_sql_queries", + fake_record_sql_queries, + ): + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) await conn.close() @@ -697,27 +721,26 @@ async def test_query_source_if_duration_over_threshold(sentry_init, capture_even events = capture_events() - with start_transaction(name="test_transaction", sampled=True): + with start_span(name="test_span"): conn: Connection = await connect(PG_CONNECTION_URI) @contextmanager def fake_record_sql_queries(*args, **kwargs): - with freeze_time(datetime(2024, 1, 1, microsecond=0)): + with freeze_time(datetime.datetime(2024, 1, 1, microsecond=100001)): with record_sql_queries(*args, **kwargs) as span: - freezer = freeze_time(datetime(2024, 1, 1, microsecond=100001)) - freezer.start() - - freezer.stop() - - yield span + yield span with mock.patch( - "sentry_sdk.integrations.asyncpg.record_sql_queries", - fake_record_sql_queries, + "sentry_sdk.tracing.POTelSpan.start_timestamp", + datetime.datetime(2024, 1, 1, microsecond=0, tzinfo=datetime.timezone.utc), ): - await conn.execute( - "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", - ) + with mock.patch( + "sentry_sdk.integrations.asyncpg.record_sql_queries", + fake_record_sql_queries, + ): + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) await conn.close() @@ -760,7 +783,7 @@ async def test_span_origin(sentry_init, capture_events): events = capture_events() - with start_transaction(name="test_transaction"): + with start_span(name="test_span"): conn: Connection = await connect(PG_CONNECTION_URI) await conn.execute("SELECT 1")