diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index 60dc13e675..87b640f139 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -3,9 +3,16 @@ from opentelemetry import context, propagate from opentelemetry.propagators import textmap +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_DESTINATION_TEMPORARY, + MESSAGING_KAFKA_DESTINATION_PARTITION, + MESSAGING_MESSAGE_ID, + MESSAGING_OPERATION, + MESSAGING_SYSTEM, + MessagingOperationTypeValues, +) from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, - MessagingOperationValues, SpanAttributes, ) from opentelemetry.trace import Link, SpanKind @@ -114,32 +121,29 @@ def _enrich_span( topic, partition: Optional[int] = None, offset: Optional[int] = None, - operation: Optional[MessagingOperationValues] = None, + operation: Optional[MessagingOperationTypeValues] = None, ): if not span.is_recording(): return - span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") + span.set_attribute(MESSAGING_SYSTEM, "kafka") span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) - if partition is not None: - span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) - + span.set_attribute(MESSAGING_KAFKA_DESTINATION_PARTITION, partition) span.set_attribute( SpanAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.QUEUE.value, ) - if operation: - span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value) + span.set_attribute(MESSAGING_OPERATION, operation.value) else: - span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) + span.set_attribute(MESSAGING_DESTINATION_TEMPORARY, True) # https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic # A message within Kafka is uniquely defined by its topic name, topic partition and offset. if partition is not None and offset is not None and topic: span.set_attribute( - SpanAttributes.MESSAGING_MESSAGE_ID, + MESSAGING_MESSAGE_ID, f"{topic}.{partition}.{offset}", ) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 986116900d..90f1db50de 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -25,6 +25,12 @@ KafkaContextGetter, KafkaContextSetter, ) +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_KAFKA_DESTINATION_PARTITION, + MESSAGING_MESSAGE_ID, + MESSAGING_OPERATION, + MESSAGING_SYSTEM, +) from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, SpanAttributes, @@ -122,36 +128,36 @@ def test_poll(self) -> None: { "name": "topic-10 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_OPERATION: "process", + MESSAGING_KAFKA_DESTINATION_PARTITION: 0, + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-10", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0", + MESSAGING_MESSAGE_ID: "topic-10.0.0", }, }, {"name": "recv", "attributes": {}}, { "name": "topic-20 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_KAFKA_PARTITION: 2, - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_OPERATION: "process", + MESSAGING_KAFKA_DESTINATION_PARTITION: 2, + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-20", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4", + MESSAGING_MESSAGE_ID: "topic-20.2.4", }, }, {"name": "recv", "attributes": {}}, { "name": "topic-30 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_KAFKA_PARTITION: 1, - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_OPERATION: "process", + MESSAGING_KAFKA_DESTINATION_PARTITION: 1, + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-30", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3", + MESSAGING_MESSAGE_ID: "topic-30.1.3", }, }, {"name": "recv", "attributes": {}}, @@ -190,8 +196,8 @@ def test_consume(self) -> None: { "name": "topic-1 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_OPERATION: "process", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-1", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, @@ -200,8 +206,8 @@ def test_consume(self) -> None: { "name": "topic-2 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_OPERATION: "process", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-2", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, @@ -210,8 +216,8 @@ def test_consume(self) -> None: { "name": "topic-3 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_OPERATION: "process", + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-3", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, @@ -247,12 +253,12 @@ def test_close(self) -> None: { "name": "topic-a process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, - SpanAttributes.MESSAGING_SYSTEM: "kafka", + MESSAGING_OPERATION: "process", + MESSAGING_KAFKA_DESTINATION_PARTITION: 0, + MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-a", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-a.0.0", + MESSAGING_MESSAGE_ID: "topic-a.0.0", }, }, ]