From 2f549fb6b67cf6c23d613ae13e31842155492a1c Mon Sep 17 00:00:00 2001 From: zhengkezhou1 Date: Sun, 11 May 2025 05:00:46 +0800 Subject: [PATCH] don't change semantic conventions --- .../instrumentation/confluent_kafka/utils.py | 27 +++++--- .../tests/test_instrumentation.py | 67 ++++++++++--------- 2 files changed, 53 insertions(+), 41 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index 60dc13e675..0b784a4a73 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -3,9 +3,17 @@ from opentelemetry import context, propagate from opentelemetry.propagators import textmap +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_DESTINATION_NAME, + MESSAGING_DESTINATION_TEMPORARY, + MESSAGING_KAFKA_DESTINATION_PARTITION, + MESSAGING_MESSAGE_ID, + MESSAGING_OPERATION, + MESSAGING_SYSTEM, + MessagingOperationTypeValues, +) from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, - MessagingOperationValues, SpanAttributes, ) from opentelemetry.trace import Link, SpanKind @@ -114,32 +122,29 @@ def _enrich_span( topic, partition: Optional[int] = None, offset: Optional[int] = None, - operation: Optional[MessagingOperationValues] = None, + operation: Optional[MessagingOperationTypeValues] = None, ): if not span.is_recording(): return - span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") - span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) - + span.set_attribute(MESSAGING_SYSTEM, "kafka") + span.set_attribute(MESSAGING_DESTINATION_NAME, topic) if partition is not None: - span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) - + span.set_attribute(MESSAGING_KAFKA_DESTINATION_PARTITION, partition) span.set_attribute( SpanAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.QUEUE.value, ) - if operation: - span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value) + span.set_attribute(MESSAGING_OPERATION, operation.value) else: - span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) + span.set_attribute(MESSAGING_DESTINATION_TEMPORARY, True) # https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic # A message within Kafka is uniquely defined by its topic name, topic partition and offset. if partition is not None and offset is not None and topic: span.set_attribute( - SpanAttributes.MESSAGING_MESSAGE_ID, + MESSAGING_MESSAGE_ID, f"{topic}.{partition}.{offset}", ) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 986116900d..4cd6616d9a 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -25,6 +25,13 @@ KafkaContextGetter, KafkaContextSetter, ) +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_DESTINATION_NAME, + MESSAGING_KAFKA_DESTINATION_PARTITION, + MESSAGING_MESSAGE_ID, + MESSAGING_OPERATION, + MESSAGING_SYSTEM, +) from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, SpanAttributes, @@ -122,36 +129,36 @@ def test_poll(self) -> None: { "name": "topic-10 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, - SpanAttributes.MESSAGING_SYSTEM: "kafka", - SpanAttributes.MESSAGING_DESTINATION: "topic-10", + MESSAGING_OPERATION: "process", + MESSAGING_KAFKA_DESTINATION_PARTITION: 0, + MESSAGING_SYSTEM: "kafka", + MESSAGING_DESTINATION_NAME: "topic-10", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0", + MESSAGING_MESSAGE_ID: "topic-10.0.0", }, }, {"name": "recv", "attributes": {}}, { "name": "topic-20 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_KAFKA_PARTITION: 2, - SpanAttributes.MESSAGING_SYSTEM: "kafka", - SpanAttributes.MESSAGING_DESTINATION: "topic-20", + MESSAGING_OPERATION: "process", + MESSAGING_KAFKA_DESTINATION_PARTITION: 2, + MESSAGING_SYSTEM: "kafka", + MESSAGING_DESTINATION_NAME: "topic-20", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4", + MESSAGING_MESSAGE_ID: "topic-20.2.4", }, }, {"name": "recv", "attributes": {}}, { "name": "topic-30 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_KAFKA_PARTITION: 1, - SpanAttributes.MESSAGING_SYSTEM: "kafka", - SpanAttributes.MESSAGING_DESTINATION: "topic-30", + MESSAGING_OPERATION: "process", + MESSAGING_KAFKA_DESTINATION_PARTITION: 1, + MESSAGING_SYSTEM: "kafka", + MESSAGING_DESTINATION_NAME: "topic-30", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3", + MESSAGING_MESSAGE_ID: "topic-30.1.3", }, }, {"name": "recv", "attributes": {}}, @@ -190,9 +197,9 @@ def test_consume(self) -> None: { "name": "topic-1 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_SYSTEM: "kafka", - SpanAttributes.MESSAGING_DESTINATION: "topic-1", + MESSAGING_OPERATION: "process", + MESSAGING_SYSTEM: "kafka", + MESSAGING_DESTINATION_NAME: "topic-1", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, }, @@ -200,9 +207,9 @@ def test_consume(self) -> None: { "name": "topic-2 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_SYSTEM: "kafka", - SpanAttributes.MESSAGING_DESTINATION: "topic-2", + MESSAGING_OPERATION: "process", + MESSAGING_SYSTEM: "kafka", + MESSAGING_DESTINATION_NAME: "topic-2", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, }, @@ -210,9 +217,9 @@ def test_consume(self) -> None: { "name": "topic-3 process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_SYSTEM: "kafka", - SpanAttributes.MESSAGING_DESTINATION: "topic-3", + MESSAGING_OPERATION: "process", + MESSAGING_SYSTEM: "kafka", + MESSAGING_DESTINATION_NAME: "topic-3", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, }, @@ -247,12 +254,12 @@ def test_close(self) -> None: { "name": "topic-a process", "attributes": { - SpanAttributes.MESSAGING_OPERATION: "process", - SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, - SpanAttributes.MESSAGING_SYSTEM: "kafka", - SpanAttributes.MESSAGING_DESTINATION: "topic-a", + MESSAGING_OPERATION: "process", + MESSAGING_KAFKA_DESTINATION_PARTITION: 0, + MESSAGING_SYSTEM: "kafka", + MESSAGING_DESTINATION_NAME: "topic-a", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - SpanAttributes.MESSAGING_MESSAGE_ID: "topic-a.0.0", + MESSAGING_MESSAGE_ID: "topic-a.0.0", }, }, ] @@ -286,7 +293,7 @@ def _compare_spans(self, spans, expected_spans): def _assert_topic(self, span, expected_topic: str) -> None: self.assertEqual( - span.attributes[SpanAttributes.MESSAGING_DESTINATION], + span.attributes[MESSAGING_DESTINATION_NAME], expected_topic, )