From 36cda11abd37dd8fc4d6314c43203876d5082c98 Mon Sep 17 00:00:00 2001 From: Loren Brindze Date: Wed, 6 Apr 2022 14:11:21 -0700 Subject: [PATCH 1/2] handle uninitialized tasks --- .../instrumentation/celery/__init__.py | 13 ++++++++++--- .../opentelemetry/instrumentation/celery/utils.py | 3 +++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 6d79f45115..b54c7ee06a 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -183,10 +183,17 @@ def _trace_before_publish(self, *args, **kwargs): task = utils.retrieve_task_from_sender(kwargs) task_id = utils.retrieve_task_id_from_message(kwargs) - if task is None or task_id is None: + if task_id is None: return - operation_name = f"{_TASK_APPLY_ASYNC}/{task.name}" + if task is None: + # task is an anonymous task send using send_task or using canvas workflow + # Signatures() to send to a task not in the current processes dependency + # tree + task_name = kwargs.get("sender", "unknown") + else: + task_name = task.name + operation_name = f"{_TASK_APPLY_ASYNC}/{task_name}" span = self._tracer.start_span( operation_name, kind=trace.SpanKind.PRODUCER ) @@ -195,7 +202,7 @@ def _trace_before_publish(self, *args, **kwargs): if span.is_recording(): span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC) span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id) - span.set_attribute(_TASK_NAME_KEY, task.name) + span.set_attribute(_TASK_NAME_KEY, task_name) utils.set_attributes_from_context(span, kwargs) activation = trace.use_span(span, end_on_exit=True) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py index 77abb89af8..0b02fc26a2 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py @@ -132,6 +132,9 @@ def attach_span(task, task_id, span, is_publish=False): NOTE: We cannot test for this well yet, because we do not run a celery worker, and cannot run `task.apply_async()` """ + if task is None: + return + span_dict = getattr(task, CTX_KEY, None) if span_dict is None: span_dict = {} From 71a5534c33ff3064e1700c0dae27d65070cb7935 Mon Sep 17 00:00:00 2001 From: Loren Brindze Date: Tue, 12 Apr 2022 17:09:10 -0700 Subject: [PATCH 2/2] updating tests --- .../instrumentation/celery/utils.py | 2 + .../tests/test_tasks.py | 43 +++++++++++++++++++ .../tests/test_utils.py | 8 ++++ 3 files changed, 53 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py index 0b02fc26a2..e93cb57354 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py @@ -133,6 +133,8 @@ def attach_span(task, task_id, span, is_publish=False): and cannot run `task.apply_async()` """ if task is None: + # task objects can be optional, if None is passed in, there is nothing to attach + # the context state to... return span_dict = getattr(task, CTX_KEY, None) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 6b2ee9a94c..8c8f0607a0 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -77,3 +77,46 @@ def test_task(self): self.assertNotEqual(consumer.parent, producer.context) self.assertEqual(consumer.parent.span_id, producer.context.span_id) self.assertEqual(consumer.context.trace_id, producer.context.trace_id) + + +class TestCelerySignatureTask(TestBase): + def setUp(self): + super().setUp() + + def start_app(*args, **kwargs): + # Add an additional task that will not be registered with parent thread + @app.task + def hidden_task(x): + return x * 2 + + self._worker = app.Worker(app=app, pool="solo", concurrency=1) + return self._worker.start(*args, **kwargs) + + self._thread = threading.Thread(target=start_app) + self._worker = app.Worker(app=app, pool="solo", concurrency=1) + self._thread.daemon = True + self._thread.start() + + def tearDown(self): + super().tearDown() + self._worker.stop() + self._thread.join() + + def test_hidden_task(self): + # no-op since already instrumented + CeleryInstrumentor().instrument() + import ipdb; ipdb.set_trace() + + res = app.signature("app.hidden_task", (2,)).apply_async() + while not res.ready(): + time.sleep(0.05) + + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 1) + + producer = spans + + self.assertEqual( + producer.name, "apply_async/app.hidden_task" + ) + self.assertEqual(producer.kind, SpanKind.PRODUCER) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py index 8b5352fe3a..e550dfa04c 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py @@ -181,10 +181,18 @@ def fn_task(): task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f" span = trace._Span("name", mock.Mock(spec=trace_api.SpanContext)) utils.attach_span(fn_task, task_id, span) + # delete the Span utils.detach_span(fn_task, task_id) self.assertEqual(utils.retrieve_span(fn_task, task_id), (None, None)) + def test_optional_task_span_attach(self): + task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f" + span = trace._Span("name", mock.Mock(spec=trace_api.SpanContext)) + + # assert this is is a no-aop + self.assertIsNone(utils.attach_span(None, task_id, span)) + def test_span_delete_empty(self): # ensure detach_span doesn't raise an exception if span is not present @self.app.task