2929 Boto3SQSInstrumentor().instrument()
3030"""
3131import logging
32- from typing import Any , Collection , Dict , Generator , List , Optional
32+ from typing import Any , Collection , Dict , Generator , List , Mapping , Optional
3333
3434import boto3
3535import botocore .client
5353from .version import __version__
5454
5555_logger = logging .getLogger (__name__ )
56- # We use this prefix so we can request all instrumentation MessageAttributeNames with a wildcard, without harming
57- # existing filters
58- _OPENTELEMETRY_ATTRIBUTE_IDENTIFIER : str = "otel."
59- _OTEL_IDENTIFIER_LENGTH = len (_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER )
56+
57+ _IS_SQS_INSTRUMENTED_ATTRIBUTE = "_otel_boto3sqs_instrumented"
6058
6159
6260class Boto3SQSGetter (Getter [CarrierT ]):
6361 def get (self , carrier : CarrierT , key : str ) -> Optional [List [str ]]:
64- value = carrier .get (f"{ _OPENTELEMETRY_ATTRIBUTE_IDENTIFIER } { key } " , {})
65- if not value :
62+ msg_attr = carrier .get (key )
63+ if not isinstance (msg_attr , Mapping ):
64+ return None
65+
66+ value = msg_attr .get ("StringValue" )
67+ if value is None :
6668 return None
67- return [value .get ("StringValue" )]
69+
70+ return [value ]
6871
6972 def keys (self , carrier : CarrierT ) -> List [str ]:
70- return [
71- key [_OTEL_IDENTIFIER_LENGTH :]
72- if key .startswith (_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER )
73- else key
74- for key in carrier .keys ()
75- ]
73+ return list (carrier .keys ())
7674
7775
7876class Boto3SQSSetter (Setter [CarrierT ]):
7977 def set (self , carrier : CarrierT , key : str , value : str ) -> None :
8078 # This is a limitation defined by AWS for SQS MessageAttributes size
8179 if len (carrier .items ()) < 10 :
82- carrier [f" { _OPENTELEMETRY_ATTRIBUTE_IDENTIFIER } { key } " ] = {
80+ carrier [key ] = {
8381 "StringValue" : value ,
8482 "DataType" : "String" ,
8583 }
@@ -145,6 +143,7 @@ def instrumentation_dependencies(self) -> Collection[str]:
145143 def _enrich_span (
146144 span : Span ,
147145 queue_name : str ,
146+ queue_url : str ,
148147 conversation_id : Optional [str ] = None ,
149148 operation : Optional [MessagingOperationValues ] = None ,
150149 message_id : Optional [str ] = None ,
@@ -157,12 +156,12 @@ def _enrich_span(
157156 SpanAttributes .MESSAGING_DESTINATION_KIND ,
158157 MessagingDestinationKindValues .QUEUE .value ,
159158 )
159+ span .set_attribute (SpanAttributes .MESSAGING_URL , queue_url )
160+
160161 if operation :
161162 span .set_attribute (
162163 SpanAttributes .MESSAGING_OPERATION , operation .value
163164 )
164- else :
165- span .set_attribute (SpanAttributes .MESSAGING_TEMP_DESTINATION , True )
166165 if conversation_id :
167166 span .set_attribute (
168167 SpanAttributes .MESSAGING_CONVERSATION_ID , conversation_id
@@ -190,15 +189,19 @@ def _extract_queue_name_from_url(queue_url: str) -> str:
190189 return queue_url .split ("/" )[- 1 ]
191190
192191 def _create_processing_span (
193- self , queue_name : str , receipt_handle : str , message : Dict [str , Any ]
192+ self ,
193+ queue_name : str ,
194+ queue_url : str ,
195+ receipt_handle : str ,
196+ message : Dict [str , Any ],
194197 ) -> None :
195198 message_attributes = message .get ("MessageAttributes" , {})
196199 links = []
197200 ctx = propagate .extract (message_attributes , getter = boto3sqs_getter )
198- if ctx :
199- for item in ctx . values () :
200- if hasattr ( item , "get_span_context" ):
201- links . append ( Link ( context = item . get_span_context ()))
201+ parent_span_ctx = trace . get_current_span ( ctx ). get_span_context ()
202+ if parent_span_ctx . is_valid :
203+ links . append ( Link ( context = parent_span_ctx ))
204+
202205 span = self ._tracer .start_span (
203206 name = f"{ queue_name } process" , links = links , kind = SpanKind .CONSUMER
204207 )
@@ -208,11 +211,12 @@ def _create_processing_span(
208211 Boto3SQSInstrumentor ._enrich_span (
209212 span ,
210213 queue_name ,
214+ queue_url ,
211215 message_id = message_id ,
212216 operation = MessagingOperationValues .PROCESS ,
213217 )
214218
215- def _wrap_send_message (self ) -> None :
219+ def _wrap_send_message (self , sqs_class : type ) -> None :
216220 def send_wrapper (wrapped , instance , args , kwargs ):
217221 if context .get_value (_SUPPRESS_INSTRUMENTATION_KEY ):
218222 return wrapped (* args , ** kwargs )
@@ -227,7 +231,7 @@ def send_wrapper(wrapped, instance, args, kwargs):
227231 kind = SpanKind .PRODUCER ,
228232 end_on_exit = True ,
229233 ) as span :
230- Boto3SQSInstrumentor ._enrich_span (span , queue_name )
234+ Boto3SQSInstrumentor ._enrich_span (span , queue_name , queue_url )
231235 attributes = kwargs .pop ("MessageAttributes" , {})
232236 propagate .inject (attributes , setter = boto3sqs_setter )
233237 retval = wrapped (* args , MessageAttributes = attributes , ** kwargs )
@@ -239,9 +243,9 @@ def send_wrapper(wrapped, instance, args, kwargs):
239243 )
240244 return retval
241245
242- wrap_function_wrapper (self . _sqs_class , "send_message" , send_wrapper )
246+ wrap_function_wrapper (sqs_class , "send_message" , send_wrapper )
243247
244- def _wrap_send_message_batch (self ) -> None :
248+ def _wrap_send_message_batch (self , sqs_class : type ) -> None :
245249 def send_batch_wrapper (wrapped , instance , args , kwargs ):
246250 queue_url = kwargs .get ("QueueUrl" )
247251 entries = kwargs .get ("Entries" )
@@ -260,12 +264,11 @@ def send_batch_wrapper(wrapped, instance, args, kwargs):
260264 for entry in entries :
261265 entry_id = entry ["Id" ]
262266 span = self ._tracer .start_span (
263- name = f"{ queue_name } send" ,
264- kind = SpanKind .PRODUCER ,
267+ name = f"{ queue_name } send" , kind = SpanKind .PRODUCER
265268 )
266269 ids_to_spans [entry_id ] = span
267270 Boto3SQSInstrumentor ._enrich_span (
268- span , queue_name , conversation_id = entry_id
271+ span , queue_name , queue_url , conversation_id = entry_id
269272 )
270273 with trace .use_span (span ):
271274 if "MessageAttributes" not in entry :
@@ -288,15 +291,15 @@ def send_batch_wrapper(wrapped, instance, args, kwargs):
288291 return retval
289292
290293 wrap_function_wrapper (
291- self . _sqs_class , "send_message_batch" , send_batch_wrapper
294+ sqs_class , "send_message_batch" , send_batch_wrapper
292295 )
293296
294- def _wrap_receive_message (self ) -> None :
297+ def _wrap_receive_message (self , sqs_class : type ) -> None :
295298 def receive_message_wrapper (wrapped , instance , args , kwargs ):
296299 queue_url = kwargs .get ("QueueUrl" )
297300 message_attribute_names = kwargs .pop ("MessageAttributeNames" , [])
298- message_attribute_names .append (
299- f" { _OPENTELEMETRY_ATTRIBUTE_IDENTIFIER } *"
301+ message_attribute_names .extend (
302+ propagate . get_global_textmap (). fields
300303 )
301304 queue_name = Boto3SQSInstrumentor ._extract_queue_name_from_url (
302305 queue_url
@@ -309,6 +312,7 @@ def receive_message_wrapper(wrapped, instance, args, kwargs):
309312 Boto3SQSInstrumentor ._enrich_span (
310313 span ,
311314 queue_name ,
315+ queue_url ,
312316 operation = MessagingOperationValues .RECEIVE ,
313317 )
314318 retval = wrapped (
@@ -327,29 +331,31 @@ def receive_message_wrapper(wrapped, instance, args, kwargs):
327331 receipt_handle
328332 )
329333 self ._create_processing_span (
330- queue_name , receipt_handle , message
334+ queue_name , queue_url , receipt_handle , message
331335 )
332336 retval ["Messages" ] = Boto3SQSInstrumentor .ContextableList (
333337 messages
334338 )
335339 return retval
336340
337341 wrap_function_wrapper (
338- self . _sqs_class , "receive_message" , receive_message_wrapper
342+ sqs_class , "receive_message" , receive_message_wrapper
339343 )
340344
341- def _wrap_delete_message (self ) -> None :
345+ @staticmethod
346+ def _wrap_delete_message (sqs_class : type ) -> None :
342347 def delete_message_wrapper (wrapped , instance , args , kwargs ):
343348 receipt_handle = kwargs .get ("ReceiptHandle" )
344349 if receipt_handle :
345350 Boto3SQSInstrumentor ._safe_end_processing_span (receipt_handle )
346351 return wrapped (* args , ** kwargs )
347352
348353 wrap_function_wrapper (
349- self . _sqs_class , "delete_message" , delete_message_wrapper
354+ sqs_class , "delete_message" , delete_message_wrapper
350355 )
351356
352- def _wrap_delete_message_batch (self ) -> None :
357+ @staticmethod
358+ def _wrap_delete_message_batch (sqs_class : type ) -> None :
353359 def delete_message_wrapper_batch (wrapped , instance , args , kwargs ):
354360 entries = kwargs .get ("Entries" )
355361 for entry in entries :
@@ -361,9 +367,7 @@ def delete_message_wrapper_batch(wrapped, instance, args, kwargs):
361367 return wrapped (* args , ** kwargs )
362368
363369 wrap_function_wrapper (
364- self ._sqs_class ,
365- "delete_message_batch" ,
366- delete_message_wrapper_batch ,
370+ sqs_class , "delete_message_batch" , delete_message_wrapper_batch
367371 )
368372
369373 def _wrap_client_creation (self ) -> None :
@@ -375,52 +379,58 @@ def _wrap_client_creation(self) -> None:
375379
376380 def client_wrapper (wrapped , instance , args , kwargs ):
377381 retval = wrapped (* args , ** kwargs )
378- if not self ._did_decorate :
379- self ._decorate_sqs ()
382+ self ._decorate_sqs (type (retval ))
380383 return retval
381384
382385 wrap_function_wrapper (boto3 , "client" , client_wrapper )
383386
384- def _decorate_sqs (self ) -> None :
387+ def _decorate_sqs (self , sqs_class : type ) -> None :
385388 """
386389 Since botocore creates classes on the fly using schemas, we try to find the class that inherits from the base
387390 class and is SQS to wrap.
388391 """
389392 # We define SQS client as the only client that implements send_message_batch
390- sqs_class = [
391- cls
392- for cls in botocore .client .BaseClient .__subclasses__ ()
393- if hasattr (cls , "send_message_batch" )
394- ]
395- if sqs_class :
396- self ._sqs_class = sqs_class [0 ]
397- self ._did_decorate = True
398- self ._wrap_send_message ()
399- self ._wrap_send_message_batch ()
400- self ._wrap_receive_message ()
401- self ._wrap_delete_message ()
402- self ._wrap_delete_message_batch ()
403-
404- def _un_decorate_sqs (self ) -> None :
405- if self ._did_decorate :
406- unwrap (self ._sqs_class , "send_message" )
407- unwrap (self ._sqs_class , "send_message_batch" )
408- unwrap (self ._sqs_class , "receive_message" )
409- unwrap (self ._sqs_class , "delete_message" )
410- unwrap (self ._sqs_class , "delete_message_batch" )
411- self ._did_decorate = False
393+ if not hasattr (sqs_class , "send_message_batch" ):
394+ return
395+
396+ if getattr (sqs_class , _IS_SQS_INSTRUMENTED_ATTRIBUTE , False ):
397+ return
398+
399+ setattr (sqs_class , _IS_SQS_INSTRUMENTED_ATTRIBUTE , True )
400+
401+ self ._wrap_send_message (sqs_class )
402+ self ._wrap_send_message_batch (sqs_class )
403+ self ._wrap_receive_message (sqs_class )
404+ self ._wrap_delete_message (sqs_class )
405+ self ._wrap_delete_message_batch (sqs_class )
406+
407+ @staticmethod
408+ def _un_decorate_sqs (sqs_class : type ) -> None :
409+ if not getattr (sqs_class , _IS_SQS_INSTRUMENTED_ATTRIBUTE , False ):
410+ return
411+
412+ unwrap (sqs_class , "send_message" )
413+ unwrap (sqs_class , "send_message_batch" )
414+ unwrap (sqs_class , "receive_message" )
415+ unwrap (sqs_class , "delete_message" )
416+ unwrap (sqs_class , "delete_message_batch" )
417+
418+ setattr (sqs_class , _IS_SQS_INSTRUMENTED_ATTRIBUTE , False )
412419
413420 def _instrument (self , ** kwargs : Dict [str , Any ]) -> None :
414- self ._did_decorate : bool = False
415421 self ._tracer_provider : Optional [TracerProvider ] = kwargs .get (
416422 "tracer_provider"
417423 )
418424 self ._tracer : Tracer = trace .get_tracer (
419425 __name__ , __version__ , self ._tracer_provider
420426 )
421427 self ._wrap_client_creation ()
422- self ._decorate_sqs ()
428+
429+ for client_cls in botocore .client .BaseClient .__subclasses__ ():
430+ self ._decorate_sqs (client_cls )
423431
424432 def _uninstrument (self , ** kwargs : Dict [str , Any ]) -> None :
425433 unwrap (boto3 , "client" )
426- self ._un_decorate_sqs ()
434+
435+ for client_cls in botocore .client .BaseClient .__subclasses__ ():
436+ self ._un_decorate_sqs (client_cls )
0 commit comments