Skip to content

Add integerations for socket and grpc #1911

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
cb107fd
feat(integrations) Implement an integration for tracing socket module
hossein-raeisi Feb 18, 2023
41631e4
feat(tracing) Implement an interceptor for grpc server
hossein-raeisi Feb 19, 2023
5c2db86
feat(tracing) Implement unary-unary and unary-stream interceptors for…
hossein-raeisi Feb 20, 2023
77a2e06
Merge branch 'master' into add-socket-and-grpc-integrations
antonpirker Mar 3, 2023
3e35b16
Fixed httpx mess up
antonpirker Mar 3, 2023
e3194ca
Updated test config
antonpirker Mar 3, 2023
84d5b77
small nitpicking and fixes
antonpirker Mar 3, 2023
4a09bf9
Fixed flake8 errors
antonpirker Mar 3, 2023
01a51d3
feat(tracing) Add type hints for grpc and socket integrations
hossein-raeisi Mar 4, 2023
3bee0f9
Merge branch 'master' into add-socket-and-grpc-integrations
antonpirker Mar 28, 2023
b0ac211
Merge branch 'master' into add-socket-and-grpc-integrations
antonpirker Mar 28, 2023
8a8786f
Some smaller fixes
antonpirker Mar 28, 2023
044cdfb
Merge branch 'add-socket-and-grpc-integrations' of https://github.com…
antonpirker Mar 28, 2023
288aaf6
Updated test setup
antonpirker Mar 28, 2023
6f53860
Fixed typoing
antonpirker Mar 29, 2023
64d13cf
Fixed some tests
antonpirker Mar 29, 2023
4d112f5
The old way is better
antonpirker Mar 29, 2023
f94a6f1
formatting
antonpirker Mar 29, 2023
ed2a66e
Fixed some tests
antonpirker Mar 29, 2023
82d06d8
Fixed some tests
antonpirker Mar 29, 2023
c31339d
Some polishing
antonpirker Mar 29, 2023
ec855c1
Added missing file
antonpirker Mar 29, 2023
f2bfa08
Fixed linting again
antonpirker Mar 29, 2023
8bea90b
Moved test requirements into tox.ini
antonpirker Mar 29, 2023
5eed852
Test Grpc only in Python 3.7+
antonpirker Mar 29, 2023
939b184
Updated test setup
antonpirker Mar 29, 2023
e1bef26
Trigger ci (because of codecov)
antonpirker Mar 29, 2023
d81fe6c
refactor(tracing) Rename grpc's test proto content
hossein-raeisi Mar 29, 2023
aadf76d
Fixed linting
antonpirker Mar 30, 2023
6cefb69
Trigger ci (because of codecov)
antonpirker Mar 30, 2023
7b0af47
Renamed integration to fix import clashes in python 2.7
antonpirker Mar 30, 2023
4cff7d9
shame on me
antonpirker Mar 30, 2023
5a17619
Fixed unicode problem
antonpirker Mar 30, 2023
e181206
Prevent linter from breaking tests
antonpirker Mar 30, 2023
fb5e001
.
antonpirker Mar 30, 2023
6e19ecc
Reverted accidental change
antonpirker Mar 30, 2023
815b7ef
This can not be the solution, something else is wrong.
antonpirker Mar 30, 2023
53ae75d
Renamed _socket back to socket to try if it was a GH problem
antonpirker Mar 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ extend-ignore =
# is a worse version of and conflicts with B902 (first argument of a classmethod should be named cls)
N804,
extend-exclude=checkouts,lol*
exclude =
# gRCP generated files
grpc_test_service_pb2.py
grpc_test_service_pb2_grpc.py
73 changes: 73 additions & 0 deletions .github/workflows/test-integration-grpc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
name: Test grpc

on:
push:
branches:
- master
- release/**

pull_request:

# Cancel in progress workflows on pull_requests.
# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

permissions:
contents: read

env:
BUILD_CACHE_KEY: ${{ github.sha }}
CACHED_BUILD_PATHS: |
${{ github.workspace }}/dist-serverless

jobs:
test:
name: grpc, python ${{ matrix.python-version }}, ${{ matrix.os }}
runs-on: ${{ matrix.os }}
timeout-minutes: 45

strategy:
fail-fast: false
matrix:
python-version: ["3.7","3.8","3.9","3.10","3.11"]
# python3.6 reached EOL and is no longer being supported on
# new versions of hosted runners on Github Actions
# ubuntu-20.04 is the last version that supported python3.6
# see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877
os: [ubuntu-20.04]

steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Setup Test Env
run: |
pip install codecov "tox>=3,<4"

- name: Test grpc
timeout-minutes: 45
shell: bash
run: |
set -x # print commands that are executed
coverage erase

./scripts/runtox.sh "py${{ matrix.python-version }}-grpc" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch
coverage combine .coverage*
coverage xml -i
codecov --file coverage.xml

check_required_tests:
name: All grpc tests passed or skipped
needs: test
# Always run this, even if a dependent job failed
if: always()
runs-on: ubuntu-20.04
steps:
- name: Check for failures
if: contains(needs.test.result, 'failure')
run: |
echo "One of the dependent jobs have failed. You may need to re-run it." && exit 1
2 changes: 2 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,5 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-arq.*]
ignore_missing_imports = True
[mypy-grpc.*]
ignore_missing_imports = True
4 changes: 4 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class OP:
FUNCTION = "function"
FUNCTION_AWS = "function.aws"
FUNCTION_GCP = "function.gcp"
GRPC_CLIENT = "grpc.client"
GRPC_SERVER = "grpc.server"
HTTP_CLIENT = "http.client"
HTTP_CLIENT_STREAM = "http.client.stream"
HTTP_SERVER = "http.server"
Expand All @@ -83,6 +85,8 @@ class OP:
VIEW_RENDER = "view.render"
VIEW_RESPONSE_RENDER = "view.response.render"
WEBSOCKET_SERVER = "websocket.server"
SOCKET_CONNECTION = "socket.connection"
SOCKET_DNS = "socket.dns"


# This type exists to trick mypy and PyCharm into thinking `init` and `Client`
Expand Down
2 changes: 2 additions & 0 deletions sentry_sdk/integrations/grpc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .server import ServerInterceptor # noqa: F401
from .client import ClientInterceptor # noqa: F401
82 changes: 82 additions & 0 deletions sentry_sdk/integrations/grpc/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from sentry_sdk import Hub
from sentry_sdk._types import MYPY
from sentry_sdk.consts import OP
from sentry_sdk.integrations import DidNotEnable

if MYPY:
from typing import Any, Callable, Iterator, Iterable, Union

try:
import grpc
from grpc import ClientCallDetails, Call
from grpc._interceptor import _UnaryOutcome
from grpc.aio._interceptor import UnaryStreamCall
from google.protobuf.message import Message # type: ignore
except ImportError:
raise DidNotEnable("grpcio is not installed")


class ClientInterceptor(
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor # type: ignore
):
def intercept_unary_unary(self, continuation, client_call_details, request):
# type: (ClientInterceptor, Callable[[ClientCallDetails, Message], _UnaryOutcome], ClientCallDetails, Message) -> _UnaryOutcome
hub = Hub.current
method = client_call_details.method

with hub.start_span(
op=OP.GRPC_CLIENT, description="unary unary call to %s" % method
) as span:
span.set_data("type", "unary unary")
span.set_data("method", method)

client_call_details = self._update_client_call_details_metadata_from_hub(
client_call_details, hub
)

response = continuation(client_call_details, request)
span.set_data("code", response.code().name)

return response

def intercept_unary_stream(self, continuation, client_call_details, request):
# type: (ClientInterceptor, Callable[[ClientCallDetails, Message], Union[Iterable[Any], UnaryStreamCall]], ClientCallDetails, Message) -> Union[Iterator[Message], Call]
hub = Hub.current
method = client_call_details.method

with hub.start_span(
op=OP.GRPC_CLIENT, description="unary stream call to %s" % method
) as span:
span.set_data("type", "unary stream")
span.set_data("method", method)

client_call_details = self._update_client_call_details_metadata_from_hub(
client_call_details, hub
)

response = continuation(
client_call_details, request
) # type: UnaryStreamCall
span.set_data("code", response.code().name)

return response

@staticmethod
def _update_client_call_details_metadata_from_hub(client_call_details, hub):
# type: (ClientCallDetails, Hub) -> ClientCallDetails
metadata = (
list(client_call_details.metadata) if client_call_details.metadata else []
)
for key, value in hub.iter_trace_propagation_headers():
metadata.append((key, value))

client_call_details = grpc._interceptor._ClientCallDetails(
method=client_call_details.method,
timeout=client_call_details.timeout,
metadata=metadata,
credentials=client_call_details.credentials,
wait_for_ready=client_call_details.wait_for_ready,
compression=client_call_details.compression,
)

return client_call_details
64 changes: 64 additions & 0 deletions sentry_sdk/integrations/grpc/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from sentry_sdk import Hub
from sentry_sdk._types import MYPY
from sentry_sdk.consts import OP
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_CUSTOM

if MYPY:
from typing import Callable, Optional
from google.protobuf.message import Message # type: ignore

try:
import grpc
from grpc import ServicerContext, HandlerCallDetails, RpcMethodHandler
except ImportError:
raise DidNotEnable("grpcio is not installed")


class ServerInterceptor(grpc.ServerInterceptor): # type: ignore
def __init__(self, find_name=None):
# type: (ServerInterceptor, Optional[Callable[[ServicerContext], str]]) -> None
self._find_method_name = find_name or ServerInterceptor._find_name

super(ServerInterceptor, self).__init__()

def intercept_service(self, continuation, handler_call_details):
# type: (ServerInterceptor, Callable[[HandlerCallDetails], RpcMethodHandler], HandlerCallDetails) -> RpcMethodHandler
handler = continuation(handler_call_details)
if not handler or not handler.unary_unary:
return handler

def behavior(request, context):
# type: (Message, ServicerContext) -> Message
hub = Hub(Hub.current)

name = self._find_method_name(context)

if name:
metadata = dict(context.invocation_metadata())

transaction = Transaction.continue_from_headers(
metadata,
op=OP.GRPC_SERVER,
name=name,
source=TRANSACTION_SOURCE_CUSTOM,
)

with hub.start_transaction(transaction=transaction):
try:
return handler.unary_unary(request, context)
except BaseException as e:
raise e
else:
return handler.unary_unary(request, context)

return grpc.unary_unary_rpc_method_handler(
behavior,
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)

@staticmethod
def _find_name(context):
# type: (ServicerContext) -> str
return context._rpc_event.call_details.method.decode()
89 changes: 89 additions & 0 deletions sentry_sdk/integrations/socket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import socket
from sentry_sdk import Hub
from sentry_sdk._types import MYPY
from sentry_sdk.consts import OP
from sentry_sdk.integrations import Integration

if MYPY:
from socket import AddressFamily, SocketKind
from typing import Tuple, Optional, Union, List

__all__ = ["SocketIntegration"]


class SocketIntegration(Integration):
identifier = "socket"

@staticmethod
def setup_once():
# type: () -> None
"""
patches two of the most used functions of socket: create_connection and getaddrinfo(dns resolver)
"""
_patch_create_connection()
_patch_getaddrinfo()


def _get_span_description(host, port):
# type: (Union[bytes, str, None], Union[str, int, None]) -> str

try:
host = host.decode() # type: ignore
except (UnicodeDecodeError, AttributeError):
pass

description = "%s:%s" % (host, port) # type: ignore

return description


def _patch_create_connection():
# type: () -> None
real_create_connection = socket.create_connection

def create_connection(
address,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT, # type: ignore
source_address=None,
):
# type: (Tuple[Optional[str], int], Optional[float], Optional[Tuple[Union[bytearray, bytes, str], int]])-> socket.socket
hub = Hub.current
if hub.get_integration(SocketIntegration) is None:
return real_create_connection(
address=address, timeout=timeout, source_address=source_address
)

with hub.start_span(
op=OP.SOCKET_CONNECTION,
description=_get_span_description(address[0], address[1]),
) as span:
span.set_data("address", address)
span.set_data("timeout", timeout)
span.set_data("source_address", source_address)

return real_create_connection(
address=address, timeout=timeout, source_address=source_address
)

socket.create_connection = create_connection


def _patch_getaddrinfo():
# type: () -> None
real_getaddrinfo = socket.getaddrinfo

def getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
# type: (Union[bytes, str, None], Union[str, int, None], int, int, int, int) -> List[Tuple[AddressFamily, SocketKind, int, str, Union[Tuple[str, int], Tuple[str, int, int, int]]]]
hub = Hub.current
if hub.get_integration(SocketIntegration) is None:
return real_getaddrinfo(host, port, family, type, proto, flags)

with hub.start_span(
op=OP.SOCKET_DNS, description=_get_span_description(host, port)
) as span:
span.set_data("host", host)
span.set_data("port", port)

return real_getaddrinfo(host, port, family, type, proto, flags)

socket.getaddrinfo = getaddrinfo
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def get_file_text(file_name):
"fastapi": ["fastapi>=0.79.0"],
"pymongo": ["pymongo>=3.1"],
"opentelemetry": ["opentelemetry-distro>=0.35b0"],
"grpcio": ["grpcio>=1.21.1"]
},
classifiers=[
"Development Status :: 5 - Production/Stable",
Expand Down
11 changes: 6 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,20 +311,21 @@ def flush(timeout=None, callback=None):
monkeypatch.setattr(test_client.transport, "capture_event", append)
monkeypatch.setattr(test_client, "flush", flush)

return EventStreamReader(events_r)
return EventStreamReader(events_r, events_w)

return inner


class EventStreamReader(object):
def __init__(self, file):
self.file = file
def __init__(self, read_file, write_file):
self.read_file = read_file
self.write_file = write_file

def read_event(self):
return json.loads(self.file.readline().decode("utf-8"))
return json.loads(self.read_file.readline().decode("utf-8"))

def read_flush(self):
assert self.file.readline() == b"flush\n"
assert self.read_file.readline() == b"flush\n"


# scope=session ensures that fixture is run earlier
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/grpc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("grpc")
11 changes: 11 additions & 0 deletions tests/integrations/grpc/grpc_test_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";

package grpc_test_server;

service gRPCTestService{
rpc TestServe(gRPCTestMessage) returns (gRPCTestMessage);
}

message gRPCTestMessage {
string text = 1;
}
Loading