diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sqs.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sqs.py index 83d8e0af33..ce48826e44 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sqs.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sqs.py @@ -15,7 +15,12 @@ from opentelemetry.instrumentation.botocore.extensions.types import ( _AttributeMapT, _AwsSdkExtension, + _BotoResultT, ) +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace.span import Span + +_SUPPORTED_OPERATIONS = ["SendMessage", "SendMessageBatch", "ReceiveMessage"] class _SqsExtension(_AwsSdkExtension): @@ -24,3 +29,27 @@ def extract_attributes(self, attributes: _AttributeMapT): if queue_url: # TODO: update when semantic conventions exist attributes["aws.queue_url"] = queue_url + attributes[SpanAttributes.MESSAGING_SYSTEM] = "aws.sqs" + attributes[SpanAttributes.MESSAGING_URL] = queue_url + attributes[SpanAttributes.MESSAGING_DESTINATION] = queue_url.split( + "/" + )[-1] + + def on_success(self, span: Span, result: _BotoResultT): + operation = self._call_context.operation + if operation in _SUPPORTED_OPERATIONS: + if operation == "SendMessage": + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, + result.get("MessageId"), + ) + elif operation == "SendMessageBatch" and result.get("Successful"): + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, + result["Successful"][0]["MessageId"], + ) + elif operation == "ReceiveMessage" and result.get("Messages"): + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, + result["Messages"][0]["MessageId"], + ) \ No newline at end of file diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_sqs.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_sqs.py new file mode 100644 index 0000000000..78e4b6acf0 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_sqs.py @@ -0,0 +1,136 @@ +import botocore.session +from moto import mock_sqs + +from opentelemetry.instrumentation.botocore import BotocoreInstrumentor +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.test_base import TestBase + + +class TestDynamoDbExtension(TestBase): + def setUp(self): + super().setUp() + BotocoreInstrumentor().instrument() + + session = botocore.session.get_session() + session.set_credentials( + access_key="access-key", secret_key="secret-key" + ) + self.region = "us-west-2" + self.client = session.create_client("sqs", region_name=self.region) + + def tearDown(self): + super().tearDown() + BotocoreInstrumentor().uninstrument() + + @mock_sqs + def test_sqs_messaging_send_message(self): + create_queue_result = self.client.create_queue( + QueueName="test_queue_name" + ) + queue_url = create_queue_result["QueueUrl"] + response = self.client.send_message( + QueueUrl=queue_url, MessageBody="content" + ) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 2) + span = spans[1] + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs" + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_URL], queue_url + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + "test_queue_name", + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID], + response["MessageId"], + ) + + @mock_sqs + def test_sqs_messaging_send_message_batch(self): + create_queue_result = self.client.create_queue( + QueueName="test_queue_name" + ) + queue_url = create_queue_result["QueueUrl"] + response = self.client.send_message_batch( + QueueUrl=queue_url, + Entries=[ + {"Id": "1", "MessageBody": "content"}, + {"Id": "2", "MessageBody": "content2"}, + ], + ) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 2) + span = spans[1] + self.assertEqual(span.attributes["rpc.method"], "SendMessageBatch") + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs" + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_URL], queue_url + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + "test_queue_name", + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID], + response["Successful"][0]["MessageId"], + ) + + @mock_sqs + def test_sqs_messaging_receive_message(self): + create_queue_result = self.client.create_queue( + QueueName="test_queue_name" + ) + queue_url = create_queue_result["QueueUrl"] + self.client.send_message(QueueUrl=queue_url, MessageBody="content") + message_result = self.client.receive_message( + QueueUrl=create_queue_result["QueueUrl"] + ) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 3) + span = spans[-1] + self.assertEqual(span.attributes["rpc.method"], "ReceiveMessage") + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs" + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_URL], queue_url + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + "test_queue_name", + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID], + message_result["Messages"][0]["MessageId"], + ) + + @mock_sqs + def test_sqs_messaging_failed_operation(self): + with self.assertRaises(Exception): + self.client.send_message( + QueueUrl="non-existing", MessageBody="content" + ) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.attributes["rpc.method"], "SendMessage") + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs" + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_URL], "non-existing" + ) \ No newline at end of file