Skip to content

Extract span attrs from RQ job object & fix tests #3786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions MIGRATION_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ Looking to upgrade from Sentry SDK 2.x to 3.x? Here's a comprehensive list of wh
| `client` | `client.address`, `client.port` |
| full URL | `url.full` |

- If you're using the RQ integration, the `sampling_context` argument of `traces_sampler` doesn't contain the `rq_job` object anymore. Instead, the individual properties of the scope, if available, are accessible as follows:

| RQ property | Sampling context key(s) |
| --------------- | ---------------------------- |
| `rq_job.args` | `rq.job.args` |
| `rq_job.kwargs` | `rq.job.kwargs` |
| `rq_job.func` | `rq.job.func` |
| `queue.name` | `messaging.destination.name` |
| `job.id` | `messaging.message.id` |

Note that `rq.job.args`, `rq.job.kwargs`, and `rq.job.func` are serialized and not the actual objects on the job.

### Removed

- Spans no longer have a `description`. Use `name` instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ def _root_span_to_transaction_event(self, span):

transaction_name, transaction_source = extract_transaction_name_source(span)
span_data = extract_span_data(span)
(_, description, status, http_status, _) = span_data

trace_context = get_trace_context(span, span_data=span_data)
contexts = {"trace": trace_context}

profile_context = get_profile_context(span)
if profile_context:
contexts["profile"] = profile_context

(_, description, _, http_status, _) = span_data

if http_status:
contexts["response"] = {"status_code": http_status}

Expand Down
17 changes: 14 additions & 3 deletions sentry_sdk/integrations/opentelemetry/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ def extract_span_data(span):
description = span.name
status, http_status = extract_span_status(span)
origin = None

if span.attributes is None:
return (op, description, status, http_status, origin)

Expand All @@ -133,11 +132,23 @@ def extract_span_data(span):

rpc_service = span.attributes.get(SpanAttributes.RPC_SERVICE)
if rpc_service:
return ("rpc", description, status, http_status, origin)
return (
span.attributes.get(SentrySpanAttribute.OP) or "rpc",
description,
status,
http_status,
origin,
)

messaging_system = span.attributes.get(SpanAttributes.MESSAGING_SYSTEM)
if messaging_system:
return ("message", description, status, http_status, origin)
return (
span.attributes.get(SentrySpanAttribute.OP) or "message",
description,
status,
http_status,
origin,
)

faas_trigger = span.attributes.get(SpanAttributes.FAAS_TRIGGER)
if faas_trigger:
Expand Down
45 changes: 41 additions & 4 deletions sentry_sdk/integrations/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK
from sentry_sdk.utils import (
_serialize_span_attribute,
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
Expand Down Expand Up @@ -35,6 +36,15 @@
DEFAULT_TRANSACTION_NAME = "unknown RQ task"


JOB_PROPERTY_TO_ATTRIBUTE = {
"id": "messaging.message.id",
}

QUEUE_PROPERTY_TO_ATTRIBUTE = {
"name": "messaging.destination.name",
}


class RqIntegration(Integration):
identifier = "rq"
origin = f"auto.queue.{identifier}"
Expand All @@ -54,8 +64,8 @@ def setup_once():
old_perform_job = Worker.perform_job

@ensure_integration_enabled(RqIntegration, old_perform_job)
def sentry_patched_perform_job(self, job, *args, **kwargs):
# type: (Any, Job, *Queue, **Any) -> bool
def sentry_patched_perform_job(self, job, queue, *args, **kwargs):
Copy link
Contributor Author

@sentrivana sentrivana Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked that the signature of perform_job is stable in the minimal supported version and in the current release. There are actually no args and kwargs, just job and queue, but let's do it like this for forwards compat.

# type: (Any, Job, Queue, *Any, **Any) -> bool
with sentry_sdk.new_scope() as scope:
try:
transaction_name = job.func_name or DEFAULT_TRANSACTION_NAME
Expand All @@ -76,9 +86,9 @@ def sentry_patched_perform_job(self, job, *args, **kwargs):
name=transaction_name,
source=TRANSACTION_SOURCE_TASK,
origin=RqIntegration.origin,
custom_sampling_context={"rq_job": job},
attributes=_prepopulate_attributes(job, queue),
):
rv = old_perform_job(self, job, *args, **kwargs)
rv = old_perform_job(self, job, queue, *args, **kwargs)

if self.is_horse:
# We're inside of a forked process and RQ is
Expand Down Expand Up @@ -167,3 +177,30 @@ def _capture_exception(exc_info, **kwargs):
)

sentry_sdk.capture_event(event, hint=hint)


def _prepopulate_attributes(job, queue):
# type: (Job, Queue) -> dict[str, Any]
attributes = {
"messaging.system": "rq",
}

for prop, attr in JOB_PROPERTY_TO_ATTRIBUTE.items():
if getattr(job, prop, None) is not None:
attributes[attr] = getattr(job, prop)

for prop, attr in QUEUE_PROPERTY_TO_ATTRIBUTE.items():
if getattr(queue, prop, None) is not None:
attributes[attr] = getattr(queue, prop)

for key in ("args", "kwargs"):
if getattr(job, key, None):
attributes[f"rq.job.{key}"] = _serialize_span_attribute(getattr(job, key))

func = job.func
if callable(func):
func = func.__name__

attributes["rq.job.func"] = _serialize_span_attribute(func)

return attributes
41 changes: 16 additions & 25 deletions tests/integrations/rq/test_rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ def test_transaction_with_error(
)

assert envelope["type"] == "transaction"
assert envelope["contexts"]["trace"] == error_event["contexts"]["trace"]
assert envelope["contexts"]["trace"] == DictionaryContaining(
error_event["contexts"]["trace"]
)
assert envelope["transaction"] == error_event["transaction"]
assert envelope["extra"]["rq-job"] == DictionaryContaining(
{
Expand Down Expand Up @@ -148,10 +150,7 @@ def test_error_has_trace_context_if_tracing_disabled(
assert error_event["contexts"]["trace"]


def test_tracing_enabled(
sentry_init,
capture_events,
):
def test_tracing_enabled(sentry_init, capture_events, DictionaryContaining):
sentry_init(integrations=[RqIntegration()], traces_sample_rate=1.0)
events = capture_events()

Expand All @@ -165,7 +164,10 @@ def test_tracing_enabled(

assert error_event["transaction"] == "tests.integrations.rq.test_rq.crashing_job"
assert transaction["transaction"] == "tests.integrations.rq.test_rq.crashing_job"
assert transaction["contexts"]["trace"] == error_event["contexts"]["trace"]
assert (
DictionaryContaining(error_event["contexts"]["trace"])
== transaction["contexts"]["trace"]
)


def test_tracing_disabled(
Expand Down Expand Up @@ -218,9 +220,7 @@ def test_transaction_no_error(
)


def test_traces_sampler_gets_correct_values_in_sampling_context(
sentry_init, DictionaryContaining, ObjectDescribedBy # noqa:N803
):
def test_traces_sampler_gets_correct_values_in_sampling_context(sentry_init):
traces_sampler = mock.Mock(return_value=True)
sentry_init(integrations=[RqIntegration()], traces_sampler=traces_sampler)

Expand All @@ -230,22 +230,13 @@ def test_traces_sampler_gets_correct_values_in_sampling_context(
queue.enqueue(do_trick, "Bodhi", trick="roll over")
worker.work(burst=True)

traces_sampler.assert_any_call(
DictionaryContaining(
{
"rq_job": ObjectDescribedBy(
type=rq.job.Job,
attrs={
"description": "tests.integrations.rq.test_rq.do_trick('Bodhi', trick='roll over')",
"result": "Bodhi, can you roll over? Good dog!",
"func_name": "tests.integrations.rq.test_rq.do_trick",
"args": ("Bodhi",),
"kwargs": {"trick": "roll over"},
},
),
}
)
)
sampling_context = traces_sampler.call_args_list[0][0][0]
assert sampling_context["messaging.system"] == "rq"
assert sampling_context["rq.job.args"] == ["Bodhi"]
assert sampling_context["rq.job.kwargs"] == '{"trick": "roll over"}'
assert sampling_context["rq.job.func"] == "do_trick"
assert sampling_context["messaging.message.id"]
assert sampling_context["messaging.destination.name"] == "default"


@pytest.mark.skipif(
Expand Down
Loading