@@ -108,7 +108,7 @@ def serve():
108108 logging.basicConfig()
109109 serve()
110110
111- You can also add the instrumentor manually, rather than using
111+ You can also add the interceptor manually, rather than using
112112:py:class:`~opentelemetry.instrumentation.grpc.GrpcInstrumentorServer`:
113113
114114.. code-block:: python
@@ -118,6 +118,117 @@ def serve():
118118 server = grpc.server(futures.ThreadPoolExecutor(),
119119 interceptors = [server_interceptor()])
120120
121+ Usage Aio Client
122+ ----------------
123+ .. code-block:: python
124+
125+ import logging
126+ import asyncio
127+
128+ import grpc
129+
130+ from opentelemetry import trace
131+ from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorClient
132+ from opentelemetry.sdk.trace import TracerProvider
133+ from opentelemetry.sdk.trace.export import (
134+ ConsoleSpanExporter,
135+ SimpleSpanProcessor,
136+ )
137+
138+ try:
139+ from .gen import helloworld_pb2, helloworld_pb2_grpc
140+ except ImportError:
141+ from gen import helloworld_pb2, helloworld_pb2_grpc
142+
143+ trace.set_tracer_provider(TracerProvider())
144+ trace.get_tracer_provider().add_span_processor(
145+ SimpleSpanProcessor(ConsoleSpanExporter())
146+ )
147+
148+ grpc_client_instrumentor = GrpcAioInstrumentorClient()
149+ grpc_client_instrumentor.instrument()
150+
151+ async def run():
152+ with grpc.aio.insecure_channel("localhost:50051") as channel:
153+
154+ stub = helloworld_pb2_grpc.GreeterStub(channel)
155+ response = await stub.SayHello(helloworld_pb2.HelloRequest(name="YOU"))
156+
157+ print("Greeter client received: " + response.message)
158+
159+
160+ if __name__ == "__main__":
161+ logging.basicConfig()
162+ asyncio.run(run())
163+
164+ You can also add the interceptor manually, rather than using
165+ :py:class:`~opentelemetry.instrumentation.grpc.GrpcAioInstrumentorClient`:
166+
167+ .. code-block:: python
168+
169+ from opentelemetry.instrumentation.grpc import aio_client_interceptors
170+
171+ channel = grpc.aio.insecure_channel("localhost:12345", interceptors=aio_client_interceptors())
172+
173+
174+ Usage Aio Server
175+ ----------------
176+ .. code-block:: python
177+
178+ import logging
179+ import asyncio
180+
181+ import grpc
182+
183+ from opentelemetry import trace
184+ from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorServer
185+ from opentelemetry.sdk.trace import TracerProvider
186+ from opentelemetry.sdk.trace.export import (
187+ ConsoleSpanExporter,
188+ SimpleSpanProcessor,
189+ )
190+
191+ try:
192+ from .gen import helloworld_pb2, helloworld_pb2_grpc
193+ except ImportError:
194+ from gen import helloworld_pb2, helloworld_pb2_grpc
195+
196+ trace.set_tracer_provider(TracerProvider())
197+ trace.get_tracer_provider().add_span_processor(
198+ SimpleSpanProcessor(ConsoleSpanExporter())
199+ )
200+
201+ grpc_server_instrumentor = GrpcAioInstrumentorServer()
202+ grpc_server_instrumentor.instrument()
203+
204+ class Greeter(helloworld_pb2_grpc.GreeterServicer):
205+ async def SayHello(self, request, context):
206+ return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)
207+
208+
209+ async def serve():
210+
211+ server = grpc.aio.server()
212+
213+ helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
214+ server.add_insecure_port("[::]:50051")
215+ await server.start()
216+ await server.wait_for_termination()
217+
218+
219+ if __name__ == "__main__":
220+ logging.basicConfig()
221+ asyncio.run(serve())
222+
223+ You can also add the interceptor manually, rather than using
224+ :py:class:`~opentelemetry.instrumentation.grpc.GrpcAioInstrumentorServer`:
225+
226+ .. code-block:: python
227+
228+ from opentelemetry.instrumentation.grpc import aio_server_interceptor
229+
230+ server = grpc.aio.server(interceptors = [aio_server_interceptor()])
231+
121232Filters
122233-------
123234
@@ -244,6 +355,58 @@ def _uninstrument(self, **kwargs):
244355 grpc .server = self ._original_func
245356
246357
358+ class GrpcAioInstrumentorServer (BaseInstrumentor ):
359+ """
360+ Globally instrument the grpc.aio server.
361+
362+ Usage::
363+
364+ grpc_aio_server_instrumentor = GrpcAioInstrumentorServer()
365+ grpc_aio_server_instrumentor.instrument()
366+
367+ """
368+
369+ # pylint:disable=attribute-defined-outside-init, redefined-outer-name
370+
371+ def __init__ (self , filter_ = None ):
372+ excluded_service_filter = _excluded_service_filter ()
373+ if excluded_service_filter is not None :
374+ if filter_ is None :
375+ filter_ = excluded_service_filter
376+ else :
377+ filter_ = any_of (filter_ , excluded_service_filter )
378+ self ._filter = filter_
379+
380+ def instrumentation_dependencies (self ) -> Collection [str ]:
381+ return _instruments
382+
383+ def _instrument (self , ** kwargs ):
384+ self ._original_func = grpc .aio .server
385+ tracer_provider = kwargs .get ("tracer_provider" )
386+
387+ def server (* args , ** kwargs ):
388+ if "interceptors" in kwargs :
389+ # add our interceptor as the first
390+ kwargs ["interceptors" ].insert (
391+ 0 ,
392+ aio_server_interceptor (
393+ tracer_provider = tracer_provider , filter_ = self ._filter
394+ ),
395+ )
396+ else :
397+ kwargs ["interceptors" ] = [
398+ aio_server_interceptor (
399+ tracer_provider = tracer_provider , filter_ = self ._filter
400+ )
401+ ]
402+ return self ._original_func (* args , ** kwargs )
403+
404+ grpc .aio .server = server
405+
406+ def _uninstrument (self , ** kwargs ):
407+ grpc .aio .server = self ._original_func
408+
409+
247410class GrpcInstrumentorClient (BaseInstrumentor ):
248411 """
249412 Globally instrument the grpc client
@@ -315,6 +478,69 @@ def wrapper_fn(self, original_func, instance, args, kwargs):
315478 )
316479
317480
481+ class GrpcAioInstrumentorClient (BaseInstrumentor ):
482+ """
483+ Globally instrument the grpc.aio client.
484+
485+ Usage::
486+
487+ grpc_aio_client_instrumentor = GrpcAioInstrumentorClient()
488+ grpc_aio_client_instrumentor.instrument()
489+
490+ """
491+
492+ # pylint:disable=attribute-defined-outside-init, redefined-outer-name
493+
494+ def __init__ (self , filter_ = None ):
495+ excluded_service_filter = _excluded_service_filter ()
496+ if excluded_service_filter is not None :
497+ if filter_ is None :
498+ filter_ = excluded_service_filter
499+ else :
500+ filter_ = any_of (filter_ , excluded_service_filter )
501+ self ._filter = filter_
502+
503+ def instrumentation_dependencies (self ) -> Collection [str ]:
504+ return _instruments
505+
506+ def _add_interceptors (self , tracer_provider , kwargs ):
507+ if "interceptors" in kwargs and kwargs ["interceptors" ]:
508+ kwargs ["interceptors" ] = (
509+ aio_client_interceptors (
510+ tracer_provider = tracer_provider , filter_ = self ._filter
511+ )
512+ + kwargs ["interceptors" ]
513+ )
514+ else :
515+ kwargs ["interceptors" ] = aio_client_interceptors (
516+ tracer_provider = tracer_provider , filter_ = self ._filter
517+ )
518+
519+ return kwargs
520+
521+ def _instrument (self , ** kwargs ):
522+ self ._original_insecure = grpc .aio .insecure_channel
523+ self ._original_secure = grpc .aio .secure_channel
524+ tracer_provider = kwargs .get ("tracer_provider" )
525+
526+ def insecure (* args , ** kwargs ):
527+ kwargs = self ._add_interceptors (tracer_provider , kwargs )
528+
529+ return self ._original_insecure (* args , ** kwargs )
530+
531+ def secure (* args , ** kwargs ):
532+ kwargs = self ._add_interceptors (tracer_provider , kwargs )
533+
534+ return self ._original_secure (* args , ** kwargs )
535+
536+ grpc .aio .insecure_channel = insecure
537+ grpc .aio .secure_channel = secure
538+
539+ def _uninstrument (self , ** kwargs ):
540+ grpc .aio .insecure_channel = self ._original_insecure
541+ grpc .aio .secure_channel = self ._original_secure
542+
543+
318544def client_interceptor (tracer_provider = None , filter_ = None ):
319545 """Create a gRPC client channel interceptor.
320546
@@ -355,6 +581,45 @@ def server_interceptor(tracer_provider=None, filter_=None):
355581 return _server .OpenTelemetryServerInterceptor (tracer , filter_ = filter_ )
356582
357583
584+ def aio_client_interceptors (tracer_provider = None , filter_ = None ):
585+ """Create a gRPC client channel interceptor.
586+
587+ Args:
588+ tracer: The tracer to use to create client-side spans.
589+
590+ Returns:
591+ An invocation-side interceptor object.
592+ """
593+ from . import _aio_client
594+
595+ tracer = trace .get_tracer (__name__ , __version__ , tracer_provider )
596+
597+ return [
598+ _aio_client .UnaryUnaryAioClientInterceptor (tracer , filter_ = filter_ ),
599+ _aio_client .UnaryStreamAioClientInterceptor (tracer , filter_ = filter_ ),
600+ _aio_client .StreamUnaryAioClientInterceptor (tracer , filter_ = filter_ ),
601+ _aio_client .StreamStreamAioClientInterceptor (tracer , filter_ = filter_ ),
602+ ]
603+
604+
605+ def aio_server_interceptor (tracer_provider = None , filter_ = None ):
606+ """Create a gRPC aio server interceptor.
607+
608+ Args:
609+ tracer: The tracer to use to create server-side spans.
610+
611+ Returns:
612+ A service-side interceptor object.
613+ """
614+ from . import _aio_server
615+
616+ tracer = trace .get_tracer (__name__ , __version__ , tracer_provider )
617+
618+ return _aio_server .OpenTelemetryAioServerInterceptor (
619+ tracer , filter_ = filter_
620+ )
621+
622+
358623def _excluded_service_filter () -> Union [Callable [[object ], bool ], None ]:
359624 services = _parse_services (
360625 os .environ .get ("OTEL_PYTHON_GRPC_EXCLUDED_SERVICES" , "" )
0 commit comments