Skip to content

Celery publish from celery creating separate traces #609

@zionsofer

Description

@zionsofer

Describe your environment
python==3.7.4/3.7.6
Platform==Local - MacOS(Darwin-20.5.0-x86_64-i386-64bit), Remote - Linux (many distros)
Otel release==0.22b0 (verified on OTEL main as well - same code and same behavior)

Steps to reproduce
When running two celery apps where one uses another, when instrumenting the celery apps with OpenTelemetry, the second celery worker creates a span within a separate trace from the first one.

Reproduction example:
otel_celery_first.py

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor

from celery import Celery
from celery.signals import worker_process_init


@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    trace.set_tracer_provider(TracerProvider())
    span_processor = BatchSpanProcessor(ConsoleSpanExporter())
    trace.get_tracer_provider().add_span_processor(span_processor)
    CeleryInstrumentor().instrument()


app = Celery("first", broker="amqp://localhost")


@app.task(name="first.ping")
def ping():
    print("first ping")
    second_app = Celery("second", broker="amqp://localhost")
    second_app.send_task("second.ping", queue="second")

otel_celery_second.py

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor

from celery import Celery
from celery.signals import worker_process_init


@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    trace.set_tracer_provider(TracerProvider())
    span_processor = BatchSpanProcessor(ConsoleSpanExporter())
    trace.get_tracer_provider().add_span_processor(span_processor)
    CeleryInstrumentor().instrument()


app = Celery("second", broker="amqp://localhost")


@app.task(name="second.ping")
def ping():
    print("second ping")

test_celery.py

from celery import Celery

app = Celery("first", broker="amqp://localhost")

app.send_task("first.ping", queue="first")

Running the workers:
celery -A otel_celery_first worker -n first@%h -Q first
celery -A otel_celery_second worker -n second@%h -Q second

Sending the task:
python test_celery.py

What is the expected behavior?
Two spans, with the same trace ID.

First celery worker output:

[2021-07-27 20:35:03,691: WARNING/ForkPoolWorker-8] first ping
{
    "name": "run/first.ping",
    "context": {
        "trace_id": "0x15ac5bd9daf196693bf11a2dc5973763",
        "span_id": "0x83b707a77a03780a",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CONSUMER",
    "parent_id": null,
    "start_time": "2021-07-27T17:35:03.691780Z",
    "end_time": "2021-07-27T17:35:03.724748Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "celery.action": "run",
        "celery.state": "SUCCESS",
        "messaging.conversation_id": "8bc5049e-0ad0-461f-a91b-e2e182ffd30d",
        "messaging.destination": "first",
        "celery.delivery_info": "{'exchange': '', 'routing_key': 'first', 'priority': 0, 'redelivered': False}",
        "messaging.message_id": "8bc5049e-0ad0-461f-a91b-e2e182ffd30d",
        "celery.reply_to": "bf7eae06-9d67-35ba-976c-4e58d3612c7e",
        "celery.hostname": "gen39617@Zion-MacBook-Pro",
        "celery.task_name": "first.ping"
    },
    "events": [],
    "links": [],
    "resource": {
        "telemetry.sdk.language": "python",
        "telemetry.sdk.name": "opentelemetry",
        "telemetry.sdk.version": "1.1.0",
        "service.name": "unknown_service"
    }
}

Second celery worker output:

[2021-07-27 20:35:03,731: WARNING/ForkPoolWorker-8] second ping
{
    "name": "run/second.ping",
    "context": {
        "trace_id": "0x15ac5bd9daf196693bf11a2dc5973763",
        "span_id": "0xa8ff39fb540c86cc",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CONSUMER",
    "parent_id": null,
    "start_time": "2021-07-27T17:35:03.731708Z",
    "end_time": "2021-07-27T17:35:03.733191Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "celery.action": "run",
        "celery.state": "SUCCESS",
        "messaging.conversation_id": "6dead959-c702-4aed-960d-68641bec23e4",
        "messaging.destination": "second",
        "celery.delivery_info": "{'exchange': '', 'routing_key': 'second', 'priority': 0, 'redelivered': False}",
        "messaging.message_id": "6dead959-c702-4aed-960d-68641bec23e4",
        "celery.reply_to": "dbf19329-41f0-32da-9b83-626ea38407f8",
        "celery.hostname": "gen39573@Zion-MacBook-Pro",
        "celery.task_name": "second.ping"
    },
    "events": [],
    "links": [],
    "resource": {
        "telemetry.sdk.language": "python",
        "telemetry.sdk.name": "opentelemetry",
        "telemetry.sdk.version": "1.1.0",
        "service.name": "unknown_service"
    }
}

What is the actual behavior?
Two spans, with a different trace ID for each:

First celery worker output:

[2021-07-27 20:35:03,691: WARNING/ForkPoolWorker-8] first ping
{
    "name": "run/first.ping",
    "context": {
        "trace_id": "0x15ac5bd9daf196693bf11a2dc5973763",
        "span_id": "0x83b707a77a03780a",
        "trace_state": "[]"
    },
    # same attributes
}

Second celery worker output:

[2021-07-27 20:35:03,731: WARNING/ForkPoolWorker-8] second ping
{
    "name": "run/second.ping",
    "context": {
        "trace_id": "0x2488345758db06e139f722b4bba08cb3,
        "span_id": "0xa8ff39fb540c86cc",
        "trace_state": "[]"
    },
    # same attributes
}

I would expect a span of apply_async which is also missing from the first worker.

Additional context
I believe this has got to do with the _trace_before_publish signal handler:

def _trace_before_publish(self, *args, **kwargs):
    task = utils.retrieve_task_from_sender(kwargs)
    task_id = utils.retrieve_task_id_from_message(kwargs)

    if task is None or task_id is None:
         return

The task is tried to be retrieved from the celery registry by the sender name, which is the task name that we send to. But, the first worker does not explicitly declare the tasks it sends as part of its registry, so the task is not found and thus returns None which causes the function to exit prematurely. Perhaps there's a better to way to handle this?

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingtriaged

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions