Skip to content

Commit 5c2db86

Browse files
feat(tracing) Implement unary-unary and unary-stream interceptors for grpc stub
Implement gRPC UnaryUnaryClientInterceptor and grpc.UnaryStreamClientInterceptor; starts a span for grpc call and sends propagation headers in metadata
1 parent 41631e4 commit 5c2db86

File tree

4 files changed

+142
-2
lines changed

4 files changed

+142
-2
lines changed

sentry_sdk/consts.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class OP:
8080
SOCKET_CONNECTION = "socket.connection"
8181
SOCKET_DNS = "socket.dns"
8282
GRPC_SERVER = "grpc.server"
83+
GRPC_CLIENT = "grpc.client"
8384

8485

8586
# This type exists to trick mypy and PyCharm into thinking `init` and `Client`
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
from .server import ServerInterceptor
2+
from .client import ClientInterceptor
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from sentry_sdk import Hub
2+
from sentry_sdk._types import MYPY
3+
from sentry_sdk.consts import OP
4+
from sentry_sdk.integrations import DidNotEnable
5+
6+
if MYPY:
7+
pass
8+
9+
try:
10+
import grpc
11+
except ImportError:
12+
raise DidNotEnable("grpcio is not installed")
13+
14+
15+
class ClientInterceptor(
16+
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor
17+
):
18+
def intercept_unary_unary(self, continuation, client_call_details, request):
19+
hub = Hub.current
20+
method = client_call_details.method
21+
22+
with hub.start_span(
23+
op=OP.GRPC_CLIENT, description="unary unary call to %s" % method
24+
) as span:
25+
span.set_data("type", "unary unary")
26+
span.set_data("method", method)
27+
28+
client_call_details = self._update_client_call_details_metadata_from_hub(
29+
client_call_details, hub
30+
)
31+
32+
response = continuation(client_call_details, request)
33+
span.set_data("code", response.code().name)
34+
35+
return response
36+
37+
def intercept_unary_stream(self, continuation, client_call_details, request):
38+
hub = Hub.current
39+
method = client_call_details.method
40+
41+
with hub.start_span(
42+
op=OP.GRPC_CLIENT, description="unary stream call to %s" % method
43+
) as span:
44+
span.set_data("type", "unary stream")
45+
span.set_data("method", method)
46+
47+
client_call_details = self._update_client_call_details_metadata_from_hub(
48+
client_call_details, hub
49+
)
50+
51+
response = continuation(client_call_details, request)
52+
span.set_data("code", response.code().name)
53+
54+
return response
55+
56+
@staticmethod
57+
def _update_client_call_details_metadata_from_hub(client_call_details, hub):
58+
metadata = (
59+
list(client_call_details.metadata) if client_call_details.metadata else []
60+
)
61+
for key, value in hub.iter_trace_propagation_headers():
62+
metadata.append((key, value))
63+
64+
client_call_details = grpc._interceptor._ClientCallDetails(
65+
method=client_call_details.method,
66+
timeout=client_call_details.timeout,
67+
metadata=metadata,
68+
credentials=client_call_details.credentials,
69+
wait_for_ready=client_call_details.wait_for_ready,
70+
compression=client_call_details.compression,
71+
)
72+
73+
return client_call_details

tests/integrations/grpc/test_server.py renamed to tests/integrations/grpc/test_grpc.py

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from sentry_sdk import Hub, start_transaction
99
from sentry_sdk.consts import OP
10+
from sentry_sdk.integrations.grpc.client import ClientInterceptor
1011
from sentry_sdk.integrations.grpc.server import ServerInterceptor
1112
from tests.integrations.grpc.test_service_pb2 import TestMessage
1213
from tests.integrations.grpc.test_service_pb2_grpc import (
@@ -68,8 +69,8 @@ def test_grpc_server_continues_transaction(sentry_init, capture_events_forksafe)
6869
trace_id=transaction.trace_id,
6970
parent_span_id=transaction.span_id,
7071
sampled=1,
71-
)
72-
)
72+
),
73+
),
7374
)
7475
stub.TestServe(TestMessage(text="test"), metadata=metadata)
7576

@@ -88,6 +89,70 @@ def test_grpc_server_continues_transaction(sentry_init, capture_events_forksafe)
8889
assert span["op"] == "test"
8990

9091

92+
@pytest.mark.forked
93+
def test_grpc_client_starts_span(sentry_init, capture_events_forksafe):
94+
sentry_init(traces_sample_rate=1.0)
95+
events = capture_events_forksafe()
96+
interceptors = [ClientInterceptor()]
97+
98+
server = _set_up()
99+
100+
with grpc.insecure_channel(f"localhost:{PORT}") as channel:
101+
channel = grpc.intercept_channel(channel, *interceptors)
102+
stub = TestServiceStub(channel)
103+
104+
with start_transaction():
105+
stub.TestServe(TestMessage(text="test"))
106+
107+
_tear_down(server=server)
108+
109+
events.write_file.close()
110+
events.read_event()
111+
local_transaction = events.read_event()
112+
span = local_transaction["spans"][0]
113+
114+
assert len(local_transaction["spans"]) == 1
115+
assert span["op"] == OP.GRPC_CLIENT
116+
assert (
117+
span["description"]
118+
== "unary unary call to /test_grpc_server.TestService/TestServe"
119+
)
120+
assert span["data"] == {
121+
"type": "unary unary",
122+
"method": "/test_grpc_server.TestService/TestServe",
123+
"code": "OK",
124+
}
125+
126+
127+
@pytest.mark.forked
128+
def test_grpc_client_and_servers_interceptors_integration(
129+
sentry_init, capture_events_forksafe
130+
):
131+
sentry_init(traces_sample_rate=1.0)
132+
events = capture_events_forksafe()
133+
interceptors = [ClientInterceptor()]
134+
135+
server = _set_up()
136+
137+
with grpc.insecure_channel(f"localhost:{PORT}") as channel:
138+
channel = grpc.intercept_channel(channel, *interceptors)
139+
stub = TestServiceStub(channel)
140+
141+
with start_transaction():
142+
stub.TestServe(TestMessage(text="test"))
143+
144+
_tear_down(server=server)
145+
146+
events.write_file.close()
147+
server_transaction = events.read_event()
148+
local_transaction = events.read_event()
149+
150+
assert (
151+
server_transaction["contexts"]["trace"]["trace_id"]
152+
== local_transaction["contexts"]["trace"]["trace_id"]
153+
)
154+
155+
91156
def _set_up():
92157
server = grpc.server(
93158
futures.ThreadPoolExecutor(max_workers=2),

0 commit comments

Comments
 (0)