Skip to content

Commit 2440ddb

Browse files
feat(celery): Queues module producer implementation
Fixes GH-3078
1 parent 00111d5 commit 2440ddb

File tree

3 files changed

+77
-0
lines changed

3 files changed

+77
-0
lines changed

sentry_sdk/consts.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,7 @@ class OP:
388388
LANGCHAIN_AGENT = "ai.agent.langchain"
389389
LANGCHAIN_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.langchain"
390390
QUEUE_PROCESS = "queue.process"
391+
QUEUE_PUBLISH = "queue.publish"
391392
QUEUE_SUBMIT_ARQ = "queue.submit.arq"
392393
QUEUE_TASK_ARQ = "queue.task.arq"
393394
QUEUE_SUBMIT_CELERY = "queue.submit.celery"

sentry_sdk/integrations/celery/__init__.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import sys
2+
from collections.abc import Mapping
23
from functools import wraps
34

45
import sentry_sdk
@@ -47,6 +48,7 @@
4748
Retry,
4849
SoftTimeLimitExceeded,
4950
)
51+
from kombu import Producer # type: ignore
5052
except ImportError:
5153
raise DidNotEnable("Celery not installed")
5254

@@ -82,6 +84,7 @@ def setup_once():
8284
_patch_build_tracer()
8385
_patch_task_apply_async()
8486
_patch_worker_exit()
87+
_patch_producer_publish()
8588

8689
# This logger logs every status of every task that ran on the worker.
8790
# Meaning that every task's breadcrumbs are full of stuff like "Task
@@ -433,3 +436,44 @@ def sentry_workloop(*args, **kwargs):
433436
sentry_sdk.flush()
434437

435438
Worker.workloop = sentry_workloop
439+
440+
441+
def _patch_producer_publish():
442+
# type: () -> None
443+
original_publish = Producer.publish
444+
445+
@ensure_integration_enabled(CeleryIntegration, original_publish)
446+
def sentry_publish(self, *args, **kwargs):
447+
# type: (Producer, *Any, **Any) -> Any
448+
kwargs_headers = kwargs.get("headers", {})
449+
if not isinstance(kwargs_headers, Mapping):
450+
# Ensure kwargs_headers is a Mapping, so we can safely call get()
451+
kwargs_headers = {}
452+
453+
task_name = kwargs_headers.get("task")
454+
task_id = kwargs_headers.get("id")
455+
retries = kwargs_headers.get("retries")
456+
457+
routing_key = kwargs.get("routing_key")
458+
exchange = kwargs.get("exchange")
459+
460+
with sentry_sdk.start_span(op=OP.QUEUE_PUBLISH, description=task_name) as span:
461+
if task_id is not None:
462+
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id)
463+
464+
if exchange == "" and routing_key is not None:
465+
# Empty exchange indicates the default exchange, meaning messages are
466+
# routed to the queue with the same name as the routing key.
467+
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
468+
469+
if retries is not None:
470+
span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)
471+
472+
with capture_internal_exceptions():
473+
span.set_data(
474+
SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type
475+
)
476+
477+
return original_publish(self, *args, **kwargs)
478+
479+
Producer.publish = sentry_publish

tests/integrations/celery/test_celery.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import threading
2+
import kombu
23
from unittest import mock
34

45
import pytest
@@ -722,3 +723,34 @@ def task(): ...
722723
(event,) = events
723724
(span,) = event["spans"]
724725
assert span["data"]["messaging.system"] == system
726+
727+
728+
@pytest.mark.parametrize("system", ("amqp", "redis"))
729+
def test_producer_span_data(system, monkeypatch, sentry_init, capture_events):
730+
old_publish = kombu.messaging.Producer._publish
731+
732+
def publish(*args, **kwargs):
733+
pass
734+
735+
monkeypatch.setattr(kombu.messaging.Producer, "_publish", publish)
736+
737+
sentry_init(integrations=[CeleryIntegration()], enable_tracing=True)
738+
celery = Celery(__name__, broker=f"{system}://example.com") # noqa: E231
739+
events = capture_events()
740+
741+
@celery.task()
742+
def task(): ...
743+
744+
with start_transaction():
745+
task.apply_async()
746+
747+
(event,) = events
748+
span = next(span for span in event["spans"] if span["op"] == "queue.publish")
749+
750+
assert span["data"]["messaging.system"] == system
751+
752+
assert span["data"]["messaging.destination.name"] == "celery"
753+
assert "messaging.message.id" in span["data"]
754+
assert span["data"]["messaging.message.retry.count"] == 0
755+
756+
monkeypatch.setattr(kombu.messaging.Producer, "_publish", old_publish)

0 commit comments

Comments
 (0)