diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index ff0fa93902..a999f5f743 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -286,7 +286,6 @@ async def serve(): negate, service_name, ) -from opentelemetry.instrumentation.grpc.grpcext import intercept_channel from opentelemetry.instrumentation.grpc.package import _instruments from opentelemetry.instrumentation.grpc.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -339,7 +338,8 @@ def server(*args, **kwargs): kwargs["interceptors"].insert( 0, server_interceptor( - tracer_provider=tracer_provider, filter_=self._filter + tracer_provider=tracer_provider, + filter_=self._filter, ), ) else: @@ -476,9 +476,10 @@ def wrapper_fn(self, original_func, instance, args, kwargs): tracer_provider = kwargs.get("tracer_provider") request_hook = self._request_hook response_hook = self._response_hook - return intercept_channel( + + return grpc.intercept_channel( channel, - client_interceptor( + *client_interceptors( tracer_provider=tracer_provider, filter_=self._filter, request_hook=request_hook, @@ -560,10 +561,10 @@ def _uninstrument(self, **kwargs): grpc.aio.secure_channel = self._original_secure -def client_interceptor( +def client_interceptors( tracer_provider=None, filter_=None, request_hook=None, response_hook=None ): - """Create a gRPC client channel interceptor. + """Create gRPC client channel interceptors. Args: tracer: The tracer to use to create client-side spans. @@ -573,7 +574,7 @@ def client_interceptor( all requests. Returns: - An invocation-side interceptor object. + A list of invocation-side interceptor objects. """ from . import _client @@ -584,12 +585,32 @@ def client_interceptor( schema_url="https://opentelemetry.io/schemas/1.11.0", ) - return _client.OpenTelemetryClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, - ) + return [ + _client.UnaryUnaryClientInterceptor( + tracer, + filter_=filter_, + request_hook=request_hook, + response_hook=response_hook, + ), + _client.UnaryStreamClientInterceptor( + tracer, + filter_=filter_, + request_hook=request_hook, + response_hook=response_hook, + ), + _client.StreamUnaryClientInterceptor( + tracer, + filter_=filter_, + request_hook=request_hook, + response_hook=response_hook, + ), + _client.StreamStreamClientInterceptor( + tracer, + filter_=filter_, + request_hook=request_hook, + response_hook=response_hook, + ), + ] def server_interceptor(tracer_provider=None, filter_=None): diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py index 9c8cc5cdf3..b932ee301d 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py @@ -18,15 +18,15 @@ import grpc from grpc.aio import ClientCallDetails, Metadata -from opentelemetry.instrumentation.grpc._client import ( - OpenTelemetryClientInterceptor, - _carrier_setter, -) +from opentelemetry import trace +from opentelemetry.instrumentation.grpc._client import _carrier_setter from opentelemetry.instrumentation.utils import is_instrumentation_enabled from opentelemetry.propagate import inject -from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.semconv.trace import RpcSystemValues, SpanAttributes from opentelemetry.trace.status import Status, StatusCode +from ._client import _safe_invoke + logger = logging.getLogger(__name__) @@ -52,7 +52,25 @@ def callback(call): return callback -class _BaseAioClientInterceptor(OpenTelemetryClientInterceptor): +class _BaseAioClientInterceptor: + def __init__( + self, tracer, filter_=None, request_hook=None, response_hook=None + ): + self._tracer = tracer + self._filter = filter_ + self._request_hook = request_hook + self._response_hook = response_hook + + def _call_request_hook(self, span, request): + if not callable(self._request_hook): + return + _safe_invoke(self._request_hook, span, request) + + def _call_response_hook(self, span, response): + if not callable(self._response_hook): + return + _safe_invoke(self._response_hook, span, response) + @staticmethod def propagate_trace_in_details(client_call_details: ClientCallDetails): metadata = client_call_details.metadata @@ -101,6 +119,22 @@ def _start_interceptor_span(self, method): set_status_on_exception=False, ) + def _start_span(self, method, **kwargs): + service, meth = method.lstrip("/").split("/", 1) + attributes = { + SpanAttributes.RPC_SYSTEM: RpcSystemValues.GRPC.value, + SpanAttributes.RPC_SERVICE: service, + SpanAttributes.RPC_METHOD: meth, + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], + } + + return self._tracer.start_as_current_span( + name=method, + kind=trace.SpanKind.CLIENT, + attributes=attributes, + **kwargs, + ) + async def _wrap_unary_response(self, continuation, span): try: call = await continuation() diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index e27c9e826f..2df0770be9 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -19,19 +19,19 @@ """Implementation of the invocation-side open-telemetry interceptor.""" +import functools import logging from collections import OrderedDict -from typing import Callable, MutableMapping +from typing import Any, Callable, MutableMapping import grpc +from grpc._interceptor import _ClientCallDetails from opentelemetry import trace -from opentelemetry.instrumentation.grpc import grpcext -from opentelemetry.instrumentation.grpc._utilities import RpcInfo from opentelemetry.instrumentation.utils import is_instrumentation_enabled from opentelemetry.propagate import inject from opentelemetry.propagators.textmap import Setter -from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.semconv.trace import RpcSystemValues, SpanAttributes from opentelemetry.trace.status import Status, StatusCode logger = logging.getLogger(__name__) @@ -49,15 +49,31 @@ def set(self, carrier: MutableMapping[str, str], key: str, value: str): _carrier_setter = _CarrierSetter() -def _make_future_done_callback(span, rpc_info): - def callback(response_future): +def _unary_done_callback( + span: trace.Span, hook_callback: Callable[[trace.Span, Any], None] +): + def callback(response_future: grpc.Future): with trace.use_span(span, end_on_exit=True): code = response_future.code() if code != grpc.StatusCode.OK: - rpc_info.error = code - return - response = response_future.result() - rpc_info.response = response + details = response_future.details() + span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0] + ) + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{code}: {details}", + ) + ) + + try: + span.record_exception(response_future.exception()) + except grpc.FutureCancelledError: + pass + else: + if hook_callback: + hook_callback(span, response_future.result()) return callback @@ -73,9 +89,12 @@ def _safe_invoke(function: Callable, *args): ) -class OpenTelemetryClientInterceptor( - grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor -): +class _BaseClientInterceptor: + """Base for client interceptors. + Supplies convenient functions which are required by all four client + interceptors. + """ + def __init__( self, tracer, filter_=None, request_hook=None, response_hook=None ): @@ -84,13 +103,76 @@ def __init__( self._request_hook = request_hook self._response_hook = response_hook + def _call_request_hook(self, span: trace.Span, request): + if not callable(self._request_hook): + return + _safe_invoke(self._request_hook, span, request) + + def _call_response_hook(self, span: trace.Span, response): + if not callable(self._response_hook): + return + _safe_invoke(self._response_hook, span, response) + + @staticmethod + def propagate_trace_in_details(client_call_details): + """Propagates the trace into the metadata of the call. + Args: + client_call_details: The original + :py:class:`~grpc.ClientCallDetails`, describing the outgoing + RPC. + Returns: + An adapted version of the original + :py:class:`~grpc.ClientCallDetails`, describing the outgoing RPC, + whereby the metadata contains the trace ID. + """ + metadata = client_call_details.metadata + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + inject(mutable_metadata, setter=_carrier_setter) + metadata = tuple(mutable_metadata.items()) + + return _ClientCallDetails( + client_call_details.method, + client_call_details.timeout, + metadata, + # credentials, wait_for_ready, and compression, depending on + # grpc-version + *client_call_details[3:], + ) + + @staticmethod + def add_error_details_to_span( + span: trace.Span, + exc: Exception, + ) -> None: + """Adds error and details to an active span. + Args: + span: The active span. + exc: The exception to get code and details from. + """ + if isinstance(exc, grpc.RpcError): + span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, + exc.code().value[0], + ) + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{type(exc).__name__}: {exc}", + ) + ) + span.record_exception(exc) + def _start_span(self, method, **kwargs): service, meth = method.lstrip("/").split("/", 1) attributes = { - SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], - SpanAttributes.RPC_METHOD: meth, + SpanAttributes.RPC_SYSTEM: RpcSystemValues.GRPC.value, SpanAttributes.RPC_SERVICE: service, + SpanAttributes.RPC_METHOD: meth, + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], } return self._tracer.start_as_current_span( @@ -100,136 +182,177 @@ def _start_span(self, method, **kwargs): **kwargs, ) - # pylint:disable=no-self-use - def _trace_result(self, span, rpc_info, result): + def _wrap_unary_response(self, span: trace.Span, continuation): + """Wraps a unary-response-RPC to record a possible exception. + + Args: + span: The active span. + continuation: A callable which is created by: + + .. code-block:: python + + functools.partial( + continuation, client_call_details, request_or_iterator + ) + + Returns: + The response if the RPC is called synchronously, or the + :py:class:`~grpc.Future` if the RPC is called asynchronously. + """ + response_future = None + try: + response_future = continuation() + except Exception as exc: + self.add_error_details_to_span(span, exc) + raise exc + finally: + if not response_future: + span.end() + # If the RPC is called asynchronously, add a callback to end the span # when the future is done, else end the span immediately - if isinstance(result, grpc.Future): - result.add_done_callback( - _make_future_done_callback(span, rpc_info) + if isinstance(response_future, grpc.Future): + response_future.add_done_callback( + _unary_done_callback( + span, + self._call_response_hook if self._response_hook else None, + ) ) - return result - response = result - # Handle the case when the RPC is initiated via the with_call - # method and the result is a tuple with the first element as the - # response. - # http://www.grpc.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.with_call - if isinstance(result, tuple): - response = result[0] - rpc_info.response = response + return response_future + if self._response_hook: - self._call_response_hook(span, response) + self._call_response_hook(span, response_future) span.end() - return result + return response_future + + def _wrap_stream_response(self, span: trace.Span, call): + """Wraps a stream-response-RPC to record a possible exception. + + Args: + span: The active span. + call: The response iterator which is created by: + + .. code-block:: python + + continuation(client_call_details, request_or_iterator) + + Returns: + The response iterator. + """ + try: + yield from call + except Exception as exc: + self.add_error_details_to_span(span, exc) + raise exc + finally: + span.end() - def _intercept(self, request, metadata, client_info, invoker): + +class UnaryUnaryClientInterceptor( + grpc.UnaryUnaryClientInterceptor, + _BaseClientInterceptor, +): + def intercept_unary_unary( + self, continuation, client_call_details, request + ): if not is_instrumentation_enabled(): - return invoker(request, metadata) + return continuation(client_call_details, request) + + if self._filter is not None and not self._filter(client_call_details): + return continuation(client_call_details, request) - if not metadata: - mutable_metadata = OrderedDict() - else: - mutable_metadata = OrderedDict(metadata) with self._start_span( - client_info.full_method, + client_call_details.method, end_on_exit=False, record_exception=False, set_status_on_exception=False, ) as span: - result = None - try: - inject(mutable_metadata, setter=_carrier_setter) - metadata = tuple(mutable_metadata.items()) - - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - request=request, - ) - if self._request_hook: - self._call_request_hook(span, request) - result = invoker(request, metadata) - except Exception as exc: - if isinstance(exc, grpc.RpcError): - span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, - exc.code().value[0], - ) - span.set_status( - Status( - status_code=StatusCode.ERROR, - description=f"{type(exc).__name__}: {exc}", - ) - ) - span.record_exception(exc) - raise exc - finally: - if not result: - span.end() - return self._trace_result(span, rpc_info, result) - - def _call_request_hook(self, span, request): - if not callable(self._request_hook): - return - _safe_invoke(self._request_hook, span, request) + if self._request_hook: + self._call_request_hook(span, request) - def _call_response_hook(self, span, response): - if not callable(self._response_hook): - return - _safe_invoke(self._response_hook, span, response) + new_details = self.propagate_trace_in_details(client_call_details) + + continuation_with_args = functools.partial( + continuation, new_details, request + ) + + return self._wrap_unary_response(span, continuation_with_args) - def intercept_unary(self, request, metadata, client_info, invoker): - if self._filter is not None and not self._filter(client_info): - return invoker(request, metadata) - return self._intercept(request, metadata, client_info, invoker) - # For RPCs that stream responses, the result can be a generator. To record - # the span across the generated responses and detect any errors, we wrap - # the result in a new generator that yields the response values. - def _intercept_server_stream( - self, request_or_iterator, metadata, client_info, invoker +class StreamUnaryClientInterceptor( + grpc.StreamUnaryClientInterceptor, + _BaseClientInterceptor, +): + def intercept_stream_unary( + self, continuation, client_call_details, request_iterator ): - if not metadata: - mutable_metadata = OrderedDict() - else: - mutable_metadata = OrderedDict(metadata) + if not is_instrumentation_enabled(): + return continuation(client_call_details, request_iterator) + + if self._filter is not None and not self._filter(client_call_details): + return continuation(client_call_details, request_iterator) - with self._start_span(client_info.full_method) as span: - inject(mutable_metadata, setter=_carrier_setter) - metadata = tuple(mutable_metadata.items()) - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, + with self._start_span( + client_call_details.method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) + + continuation_with_args = functools.partial( + continuation, new_details, request_iterator ) + return self._wrap_unary_response(span, continuation_with_args) - if client_info.is_client_stream: - rpc_info.request = request_or_iterator - try: - yield from invoker(request_or_iterator, metadata) - except grpc.RpcError as err: - span.set_status(Status(StatusCode.ERROR)) - span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0] - ) - raise err +class UnaryStreamClientInterceptor( + grpc.UnaryStreamClientInterceptor, + _BaseClientInterceptor, +): + def intercept_unary_stream( + self, continuation, client_call_details, request + ): + if not is_instrumentation_enabled(): + return continuation(client_call_details, request) + + if self._filter is not None and not self._filter(client_call_details): + return continuation(client_call_details, request) - def intercept_stream( - self, request_or_iterator, metadata, client_info, invoker + with self._start_span( + client_call_details.method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) + + resp = continuation(new_details, request) + + return self._wrap_stream_response(span, resp) + + +class StreamStreamClientInterceptor( + grpc.StreamStreamClientInterceptor, + _BaseClientInterceptor, +): + def intercept_stream_stream( + self, continuation, client_call_details, request_iterator ): if not is_instrumentation_enabled(): - return invoker(request_or_iterator, metadata) + return continuation(client_call_details, request_iterator) - if self._filter is not None and not self._filter(client_info): - return invoker(request_or_iterator, metadata) + if self._filter is not None and not self._filter(client_call_details): + return continuation(client_call_details, request_iterator) - if client_info.is_server_stream: - return self._intercept_server_stream( - request_or_iterator, metadata, client_info, invoker - ) + with self._start_span( + client_call_details.method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) - return self._intercept( - request_or_iterator, metadata, client_info, invoker - ) + resp = continuation(new_details, request_iterator) + + return self._wrap_stream_response(span, resp) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py index 8a6365b742..19b89c9ac3 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py @@ -19,24 +19,6 @@ from opentelemetry.trace.status import Status, StatusCode -class RpcInfo: - def __init__( - self, - full_method=None, - metadata=None, - timeout=None, - request=None, - response=None, - error=None, - ): - self.full_method = full_method - self.metadata = metadata - self.timeout = timeout - self.request = request - self.response = response - self.error = error - - def _server_status(code, details): error_status = Status( status_code=StatusCode.ERROR, description=f"{code}:{details}" diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py deleted file mode 100644 index d5e2549bab..0000000000 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py +++ /dev/null @@ -1,125 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# pylint:disable=import-outside-toplevel -# pylint:disable=import-self -# pylint:disable=no-name-in-module - -import abc - - -class UnaryClientInfo(abc.ABC): - """Consists of various information about a unary RPC on the - invocation-side. - - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled, or None if this method should block until - the computation is terminated or is cancelled no matter how long that - takes. - """ - - -class StreamClientInfo(abc.ABC): - """Consists of various information about a stream RPC on the - invocation-side. - - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - is_client_stream: Indicates whether the RPC is client-streaming. - is_server_stream: Indicates whether the RPC is server-streaming. - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled, or None if this method should block until - the computation is terminated or is cancelled no matter how long that - takes. - """ - - -class UnaryClientInterceptor(abc.ABC): - """Affords intercepting unary-unary RPCs on the invocation-side.""" - - @abc.abstractmethod - def intercept_unary(self, request, metadata, client_info, invoker): - """Intercepts unary-unary RPCs on the invocation-side. - - Args: - request: The request value for the RPC. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - client_info: A UnaryClientInfo containing various information about - the RPC. - invoker: The handler to complete the RPC on the client. It is the - interceptor's responsibility to call it. - - Returns: - The result from calling invoker(request, metadata). - """ - raise NotImplementedError() - - -class StreamClientInterceptor(abc.ABC): - """Affords intercepting stream RPCs on the invocation-side.""" - - @abc.abstractmethod - def intercept_stream( - self, request_or_iterator, metadata, client_info, invoker - ): - """Intercepts stream RPCs on the invocation-side. - - Args: - request_or_iterator: The request value for the RPC if - `client_info.is_client_stream` is `false`; otherwise, an iterator of - request values. - metadata: Optional :term:`metadata` to be transmitted to the service-side - of the RPC. - client_info: A StreamClientInfo containing various information about - the RPC. - invoker: The handler to complete the RPC on the client. It is the - interceptor's responsibility to call it. - - Returns: - The result from calling invoker(metadata). - """ - raise NotImplementedError() - - -def intercept_channel(channel, *interceptors): - """Creates an intercepted channel. - - Args: - channel: A Channel. - interceptors: Zero or more UnaryClientInterceptors or - StreamClientInterceptors - - Returns: - A Channel. - - Raises: - TypeError: If an interceptor derives from neither UnaryClientInterceptor - nor StreamClientInterceptor. - """ - from . import _interceptor - - return _interceptor.intercept_channel(channel, *interceptors) - - -__all__ = ( - "UnaryClientInterceptor", - "StreamClientInfo", - "StreamClientInterceptor", - "intercept_channel", -) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py deleted file mode 100644 index c7eec06c99..0000000000 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py +++ /dev/null @@ -1,397 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# pylint:disable=relative-beyond-top-level -# pylint:disable=no-member - -"""Implementation of gRPC Python interceptors.""" - -import collections - -import grpc - -from opentelemetry.instrumentation.grpc import grpcext - - -class _UnaryClientInfo( - collections.namedtuple("_UnaryClientInfo", ("full_method", "timeout")) -): - pass - - -class _StreamClientInfo( - collections.namedtuple( - "_StreamClientInfo", - ("full_method", "is_client_stream", "is_server_stream", "timeout"), - ) -): - pass - - -class _InterceptorUnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): - def __init__(self, method, base_callable, interceptor): - self._method = method - self._base_callable = base_callable - self._interceptor = interceptor - - def __call__( - self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request, metadata): - return self._base_callable( - request, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _UnaryClientInfo(self._method, timeout) - return self._interceptor.intercept_unary( - request, metadata, client_info, invoker - ) - - def with_call( - self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request, metadata): - return self._base_callable.with_call( - request, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _UnaryClientInfo(self._method, timeout) - return self._interceptor.intercept_unary( - request, metadata, client_info, invoker - ) - - def future( - self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request, metadata): - return self._base_callable.future( - request, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _UnaryClientInfo(self._method, timeout) - return self._interceptor.intercept_unary( - request, metadata, client_info, invoker - ) - - -class _InterceptorUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): - def __init__(self, method, base_callable, interceptor): - self._method = method - self._base_callable = base_callable - self._interceptor = interceptor - - def __call__( - self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request, metadata): - return self._base_callable( - request, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, False, True, timeout) - return self._interceptor.intercept_stream( - request, metadata, client_info, invoker - ) - - -class _InterceptorStreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): - def __init__(self, method, base_callable, interceptor): - self._method = method - self._base_callable = base_callable - self._interceptor = interceptor - - def __call__( - self, - request_iterator, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request_iterator, metadata): - return self._base_callable( - request_iterator, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, True, False, timeout) - return self._interceptor.intercept_stream( - request_iterator, metadata, client_info, invoker - ) - - def with_call( - self, - request_iterator, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request_iterator, metadata): - return self._base_callable.with_call( - request_iterator, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, True, False, timeout) - return self._interceptor.intercept_stream( - request_iterator, metadata, client_info, invoker - ) - - def future( - self, - request_iterator, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request_iterator, metadata): - return self._base_callable.future( - request_iterator, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, True, False, timeout) - return self._interceptor.intercept_stream( - request_iterator, metadata, client_info, invoker - ) - - -class _InterceptorStreamStreamMultiCallable(grpc.StreamStreamMultiCallable): - def __init__(self, method, base_callable, interceptor): - self._method = method - self._base_callable = base_callable - self._interceptor = interceptor - - def __call__( - self, - request_iterator, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request_iterator, metadata): - return self._base_callable( - request_iterator, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, True, True, timeout) - return self._interceptor.intercept_stream( - request_iterator, metadata, client_info, invoker - ) - - -class _InterceptorChannel(grpc.Channel): - def __init__(self, channel, interceptor): - self._channel = channel - self._interceptor = interceptor - - def subscribe(self, *args, **kwargs): - self._channel.subscribe(*args, **kwargs) - - def unsubscribe(self, *args, **kwargs): - self._channel.unsubscribe(*args, **kwargs) - - def unary_unary( - self, - method, - request_serializer=None, - response_deserializer=None, - _registered_method=False, - ): - if _registered_method: - base_callable = self._channel.unary_unary( - method, - request_serializer, - response_deserializer, - _registered_method, - ) - else: - base_callable = self._channel.unary_unary( - method, request_serializer, response_deserializer - ) - if isinstance(self._interceptor, grpcext.UnaryClientInterceptor): - return _InterceptorUnaryUnaryMultiCallable( - method, base_callable, self._interceptor - ) - return base_callable - - def unary_stream( - self, - method, - request_serializer=None, - response_deserializer=None, - _registered_method=False, - ): - if _registered_method: - base_callable = self._channel.unary_stream( - method, - request_serializer, - response_deserializer, - _registered_method, - ) - else: - base_callable = self._channel.unary_stream( - method, request_serializer, response_deserializer - ) - if isinstance(self._interceptor, grpcext.StreamClientInterceptor): - return _InterceptorUnaryStreamMultiCallable( - method, base_callable, self._interceptor - ) - return base_callable - - def stream_unary( - self, - method, - request_serializer=None, - response_deserializer=None, - _registered_method=False, - ): - if _registered_method: - base_callable = self._channel.stream_unary( - method, - request_serializer, - response_deserializer, - _registered_method, - ) - else: - base_callable = self._channel.stream_unary( - method, request_serializer, response_deserializer - ) - if isinstance(self._interceptor, grpcext.StreamClientInterceptor): - return _InterceptorStreamUnaryMultiCallable( - method, base_callable, self._interceptor - ) - return base_callable - - def stream_stream( - self, - method, - request_serializer=None, - response_deserializer=None, - _registered_method=False, - ): - if _registered_method: - base_callable = self._channel.stream_stream( - method, - request_serializer, - response_deserializer, - _registered_method, - ) - else: - base_callable = self._channel.stream_stream( - method, request_serializer, response_deserializer - ) - if isinstance(self._interceptor, grpcext.StreamClientInterceptor): - return _InterceptorStreamStreamMultiCallable( - method, base_callable, self._interceptor - ) - return base_callable - - def close(self): - if not hasattr(self._channel, "close"): - raise RuntimeError( - "close() is not supported with the installed version of grpcio" - ) - self._channel.close() - - def __enter__(self): - """Enters the runtime context related to the channel object.""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Exits the runtime context related to the channel object.""" - self.close() - - -def intercept_channel(channel, *interceptors): - result = channel - for interceptor in interceptors: - if not isinstance( - interceptor, grpcext.UnaryClientInterceptor - ) and not isinstance(interceptor, grpcext.StreamClientInterceptor): - raise TypeError( - "interceptor must be either a " - "grpcext.UnaryClientInterceptor or a " - "grpcext.StreamClientInterceptor" - ) - result = _InterceptorChannel(result, interceptor) - return result diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index 9fb922a615..80000bce70 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -19,10 +19,7 @@ from opentelemetry import trace from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient from opentelemetry.instrumentation.grpc._client import ( - OpenTelemetryClientInterceptor, -) -from opentelemetry.instrumentation.grpc.grpcext._interceptor import ( - _UnaryClientInfo, + UnaryUnaryClientInterceptor, ) from opentelemetry.instrumentation.utils import suppress_instrumentation from opentelemetry.propagate import get_global_textmap, set_global_textmap @@ -39,7 +36,6 @@ ) from ._server import create_test_server from .protobuf import test_server_pb2_grpc -from .protobuf.test_server_pb2 import Request # User defined interceptor. Is used in the tests along with the opentelemetry client interceptor. @@ -83,6 +79,16 @@ def _intercept_call( return continuation(client_call_details, request_or_iterator) +class RecordingInterceptor(grpc.UnaryUnaryClientInterceptor): + recorded_details = None + + def intercept_unary_unary( + self, continuation, client_call_details, request + ): + self.recorded_details = client_call_details + return continuation(client_call_details, request) + + class TestClientProto(TestBase): def setUp(self): super().setUp() @@ -276,30 +282,26 @@ def test_client_interceptor_trace_context_propagation( previous_propagator = get_global_textmap() try: set_global_textmap(MockTextMapPropagator()) - interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer()) - - carrier = tuple() - - def invoker(request, metadata): - nonlocal carrier - carrier = metadata - return {} - - request = Request(client_id=1, request_data="data") - interceptor.intercept_unary( - request, - {}, - _UnaryClientInfo( - full_method="/GRPCTestServer/SimpleMethod", timeout=None - ), - invoker=invoker, + interceptor = UnaryUnaryClientInterceptor(trace.NoOpTracer()) + + recording_interceptor = RecordingInterceptor() + interceptors = [interceptor, recording_interceptor] + + channel = grpc.intercept_channel( + grpc.insecure_channel("localhost:25565"), *interceptors ) - assert len(carrier) == 2 - assert carrier[0][0] == "mock-traceid" - assert carrier[0][1] == "0" - assert carrier[1][0] == "mock-spanid" - assert carrier[1][1] == "0" + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + simple_method(stub) + + metadata = recording_interceptor.recorded_details.metadata + assert len(metadata) == 3 + assert metadata[0][0] == "key" + assert metadata[0][1] == "value" + assert metadata[1][0] == "mock-traceid" + assert metadata[1][1] == "0" + assert metadata[2][0] == "mock-spanid" + assert metadata[2][1] == "0" finally: set_global_textmap(previous_propagator) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py index 81e8d708f2..ff7d20b9e2 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py @@ -22,10 +22,7 @@ from opentelemetry import trace from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient, filters from opentelemetry.instrumentation.grpc._client import ( - OpenTelemetryClientInterceptor, -) -from opentelemetry.instrumentation.grpc.grpcext._interceptor import ( - _UnaryClientInfo, + UnaryUnaryClientInterceptor, ) from opentelemetry.instrumentation.utils import suppress_instrumentation from opentelemetry.propagate import get_global_textmap, set_global_textmap @@ -42,7 +39,7 @@ ) from ._server import create_test_server from .protobuf import test_server_pb2_grpc -from .protobuf.test_server_pb2 import Request +from .test_client_interceptor import RecordingInterceptor # User defined interceptor. Is used in the tests along with the opentelemetry client interceptor. @@ -201,30 +198,26 @@ def test_client_interceptor_trace_context_propagation( previous_propagator = get_global_textmap() try: set_global_textmap(MockTextMapPropagator()) - interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer()) - - carrier = tuple() - - def invoker(request, metadata): - nonlocal carrier - carrier = metadata - return {} - - request = Request(client_id=1, request_data="data") - interceptor.intercept_unary( - request, - {}, - _UnaryClientInfo( - full_method="/GRPCTestServer/SimpleMethod", timeout=None - ), - invoker=invoker, + interceptor = UnaryUnaryClientInterceptor(trace.NoOpTracer()) + + recording_interceptor = RecordingInterceptor() + interceptors = [interceptor, recording_interceptor] + + channel = grpc.intercept_channel( + grpc.insecure_channel("localhost:25565"), *interceptors ) - assert len(carrier) == 2 - assert carrier[0][0] == "mock-traceid" - assert carrier[0][1] == "0" - assert carrier[1][0] == "mock-spanid" - assert carrier[1][1] == "0" + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + simple_method(stub) + + metadata = recording_interceptor.recorded_details.metadata + assert len(metadata) == 3 + assert metadata[0][0] == "key" + assert metadata[0][1] == "value" + assert metadata[1][0] == "mock-traceid" + assert metadata[1][1] == "0" + assert metadata[2][0] == "mock-spanid" + assert metadata[2][1] == "0" finally: set_global_textmap(previous_propagator) @@ -345,31 +338,26 @@ def test_client_interceptor_trace_context_propagation( previous_propagator = get_global_textmap() try: set_global_textmap(MockTextMapPropagator()) - interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer()) - - carrier = tuple() - - def invoker(request, metadata): - nonlocal carrier - carrier = metadata - return {} - - request = Request(client_id=1, request_data="data") - interceptor.intercept_unary( - request, - {}, - _UnaryClientInfo( - full_method="/GRPCTestServer/SimpleMethod", timeout=None - ), - invoker=invoker, - ) + interceptor = UnaryUnaryClientInterceptor(trace.NoOpTracer()) - assert len(carrier) == 2 - assert carrier[0][0] == "mock-traceid" - assert carrier[0][1] == "0" - assert carrier[1][0] == "mock-spanid" - assert carrier[1][1] == "0" + recording_interceptor = RecordingInterceptor() + interceptors = [interceptor, recording_interceptor] + channel = grpc.intercept_channel( + grpc.insecure_channel("localhost:25565"), *interceptors + ) + + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + simple_method(stub) + + metadata = recording_interceptor.recorded_details.metadata + assert len(metadata) == 3 + assert metadata[0][0] == "key" + assert metadata[0][1] == "value" + assert metadata[1][0] == "mock-traceid" + assert metadata[1][1] == "0" + assert metadata[2][0] == "mock-spanid" + assert metadata[2][1] == "0" finally: set_global_textmap(previous_propagator) @@ -608,30 +596,26 @@ def test_client_interceptor_trace_context_propagation( previous_propagator = get_global_textmap() try: set_global_textmap(MockTextMapPropagator()) - interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer()) - - carrier = tuple() - - def invoker(request, metadata): - nonlocal carrier - carrier = metadata - return {} - - request = Request(client_id=1, request_data="data") - interceptor.intercept_unary( - request, - {}, - _UnaryClientInfo( - full_method="/GRPCTestServer/SimpleMethod", timeout=None - ), - invoker=invoker, + interceptor = UnaryUnaryClientInterceptor(trace.NoOpTracer()) + + recording_interceptor = RecordingInterceptor() + interceptors = [interceptor, recording_interceptor] + + channel = grpc.intercept_channel( + grpc.insecure_channel("localhost:25565"), *interceptors ) - assert len(carrier) == 2 - assert carrier[0][0] == "mock-traceid" - assert carrier[0][1] == "0" - assert carrier[1][0] == "mock-spanid" - assert carrier[1][1] == "0" + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + simple_method(stub) + + metadata = recording_interceptor.recorded_details.metadata + assert len(metadata) == 3 + assert metadata[0][0] == "key" + assert metadata[0][1] == "value" + assert metadata[1][0] == "mock-traceid" + assert metadata[1][1] == "0" + assert metadata[2][0] == "mock-spanid" + assert metadata[2][1] == "0" finally: set_global_textmap(previous_propagator)