Skip to content

Commit 52598a8

Browse files
sdk/trace: add SpanProcessor
SpanProcessor is an interface that allows to register hooks for Span start and end invocations. This commit adds the SpanProcessor interface to the SDK as well as the MultiSpanProcessor that allows to register multiple processors.
1 parent 9aba630 commit 52598a8

File tree

2 files changed

+177
-1
lines changed

2 files changed

+177
-1
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,70 @@ def from_map(cls, maxlen, mapping):
145145
Link = namedtuple("Link", ("context", "attributes"))
146146

147147

148+
class SpanProcessor:
149+
"""Interface which allows hooks for SDK's `Span`s start and end method
150+
invocations.
151+
152+
Span processors can be registered directly using
153+
:func:`~Tracer:add_span_processor` and they are invoked in the same order
154+
as they were registered.
155+
"""
156+
157+
def on_start(self, span: "Span") -> None:
158+
"""Called when a :class:`Span` is started.
159+
160+
This method is called synchronously on the thread that starts the
161+
span, therefore it should not block or throw an exception.
162+
163+
Args:
164+
span: The :class:`Span` that just started.
165+
"""
166+
167+
def on_end(self, span: "Span") -> None:
168+
"""Called when a :class:`Span` is ended.
169+
170+
This method is called synchronously on the thread that ends the
171+
span, therefore it should not block or throw an exception.
172+
173+
Args:
174+
span: The :class:`Span` that just ended.
175+
"""
176+
177+
def shutdown(self) -> None:
178+
"""Called when a :class:`Tracer` is shutdown."""
179+
180+
181+
# Used when no SpanProcessor has been added to the Tracer
182+
_NO_OP_SPAN_PROCESSOR = SpanProcessor()
183+
184+
185+
class MultiSpanProcessor(SpanProcessor):
186+
"""Implementation of :class:`SpanProcessor` that forwards all received
187+
events to a list of `SpanProcessor`.
188+
"""
189+
190+
def __init__(self):
191+
# use a tuple to avoid race conditions when adding a new span and
192+
# interating throw it on "on_start" and "on_end".
193+
self._span_processors = ()
194+
195+
def add_span_processor(self, span_processor: SpanProcessor):
196+
"""Adds a SpanProcessor to the list handled by this instance."""
197+
self._span_processors = self._span_processors + (span_processor,)
198+
199+
def on_start(self, span: "Span") -> None:
200+
for sp in self._span_processors:
201+
sp.on_start(span)
202+
203+
def on_end(self, span: "Span") -> None:
204+
for sp in self._span_processors:
205+
sp.on_end(span)
206+
207+
def on_shutdown(self) -> None:
208+
for sp in self._span_processors:
209+
sp.shutdown()
210+
211+
148212
class Span(trace_api.Span):
149213
"""See `opentelemetry.trace.Span`.
150214
@@ -161,6 +225,8 @@ class Span(trace_api.Span):
161225
attributes: The span's attributes to be exported
162226
events: Timestamped events to be exported
163227
links: Links to other spans to be exported
228+
span_processor: `SpanProcessor` to invoke when starting and ending
229+
this `Span`.
164230
"""
165231

166232
# Initialize these lazily assuming most spans won't have them.
@@ -179,6 +245,7 @@ def __init__(
179245
attributes: types.Attributes = None, # TODO
180246
events: typing.Sequence[Event] = None, # TODO
181247
links: typing.Sequence[Link] = None, # TODO
248+
span_processor: SpanProcessor = _NO_OP_SPAN_PROCESSOR,
182249
) -> None:
183250

184251
self.name = name
@@ -190,6 +257,7 @@ def __init__(
190257
self.attributes = attributes
191258
self.events = events
192259
self.links = links
260+
self.span_processor = span_processor
193261

194262
if attributes is None:
195263
self.attributes = Span.empty_attributes
@@ -247,10 +315,12 @@ def add_link(
247315
def start(self):
248316
if self.start_time is None:
249317
self.start_time = util.time_ns()
318+
self.span_processor.on_start(self)
250319

251320
def end(self):
252321
if self.end_time is None:
253322
self.end_time = util.time_ns()
323+
self.span_processor.on_end(self)
254324

255325
def update_name(self, name: str) -> None:
256326
self.name = name
@@ -286,6 +356,8 @@ def __init__(self, name: str = "") -> None:
286356
if name:
287357
slot_name = "{}.current_span".format(name)
288358
self._current_span_slot = Context.register_slot(slot_name)
359+
self._active_span_processor = _NO_OP_SPAN_PROCESSOR
360+
self._lock = threading.Lock()
289361

290362
def get_current_span(self):
291363
"""See `opentelemetry.trace.Tracer.get_current_span`."""
@@ -325,7 +397,12 @@ def create_span(
325397
parent_context.trace_options,
326398
parent_context.trace_state,
327399
)
328-
return Span(name=name, context=context, parent=parent)
400+
return Span(
401+
name=name,
402+
context=context,
403+
parent=parent,
404+
span_processor=self._active_span_processor,
405+
)
329406

330407
@contextmanager
331408
def use_span(self, span: "Span") -> typing.Iterator["Span"]:
@@ -339,5 +416,15 @@ def use_span(self, span: "Span") -> typing.Iterator["Span"]:
339416
self._current_span_slot.set(span_snapshot)
340417
span.end()
341418

419+
def add_span_processor(self, span_processor: SpanProcessor) -> None:
420+
"""Registers a new :class:`SpanProcessor` for this `Tracer`.
421+
422+
The span processors are invoked in the same order they are registered.
423+
"""
424+
with self._lock:
425+
if self._active_span_processor is _NO_OP_SPAN_PROCESSOR:
426+
self._active_span_processor = MultiSpanProcessor()
427+
self._active_span_processor.add_span_processor(span_processor)
428+
342429

343430
tracer = Tracer()

opentelemetry-sdk/tests/trace/test_trace.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,3 +198,92 @@ class TestSpan(unittest.TestCase):
198198
def test_basic_span(self):
199199
span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext))
200200
self.assertEqual(span.name, "name")
201+
202+
203+
class TestSpanProcessor(unittest.TestCase):
204+
def test_span_processor(self):
205+
def span_event_start_fmt(span_processor_name, span_name):
206+
return span_processor_name + ":" + span_name + ":start"
207+
208+
def span_event_end_fmt(span_processor_name, span_name):
209+
return span_processor_name + ":" + span_name + ":end"
210+
211+
class MySpanProcessor(trace.SpanProcessor):
212+
def __init__(self, name, span_list):
213+
self.name = name
214+
self.span_list = span_list
215+
216+
def on_start(self, span: "trace.Span") -> None:
217+
self.span_list.append(
218+
span_event_start_fmt(self.name, span.name)
219+
)
220+
221+
def on_end(self, span: "trace.Span") -> None:
222+
self.span_list.append(span_event_end_fmt(self.name, span.name))
223+
224+
tracer = trace.Tracer()
225+
226+
spans_calls_list = [] # filled by MySpanProcessor
227+
expected_list = [] # filled by hand
228+
229+
# Span processors are created but not added to the tracer yet
230+
sp1 = MySpanProcessor("SP1", spans_calls_list)
231+
sp2 = MySpanProcessor("SP2", spans_calls_list)
232+
233+
with tracer.start_span("foo"):
234+
with tracer.start_span("bar"):
235+
with tracer.start_span("baz"):
236+
pass
237+
238+
# at this point lists must be empty
239+
self.assertEqual(len(spans_calls_list), 0)
240+
241+
# add single span processor
242+
tracer.add_span_processor(sp1)
243+
244+
with tracer.start_span("foo"):
245+
expected_list.append(span_event_start_fmt("SP1", "foo"))
246+
247+
with tracer.start_span("bar"):
248+
expected_list.append(span_event_start_fmt("SP1", "bar"))
249+
250+
with tracer.start_span("baz"):
251+
expected_list.append(span_event_start_fmt("SP1", "baz"))
252+
253+
expected_list.append(span_event_end_fmt("SP1", "baz"))
254+
255+
expected_list.append(span_event_end_fmt("SP1", "bar"))
256+
257+
expected_list.append(span_event_end_fmt("SP1", "foo"))
258+
259+
self.assertListEqual(spans_calls_list, expected_list)
260+
261+
spans_calls_list.clear()
262+
expected_list.clear()
263+
264+
# go for multiple span processors
265+
tracer.add_span_processor(sp2)
266+
267+
with tracer.start_span("foo"):
268+
expected_list.append(span_event_start_fmt("SP1", "foo"))
269+
expected_list.append(span_event_start_fmt("SP2", "foo"))
270+
271+
with tracer.start_span("bar"):
272+
expected_list.append(span_event_start_fmt("SP1", "bar"))
273+
expected_list.append(span_event_start_fmt("SP2", "bar"))
274+
275+
with tracer.start_span("baz"):
276+
expected_list.append(span_event_start_fmt("SP1", "baz"))
277+
expected_list.append(span_event_start_fmt("SP2", "baz"))
278+
279+
expected_list.append(span_event_end_fmt("SP1", "baz"))
280+
expected_list.append(span_event_end_fmt("SP2", "baz"))
281+
282+
expected_list.append(span_event_end_fmt("SP1", "bar"))
283+
expected_list.append(span_event_end_fmt("SP2", "bar"))
284+
285+
expected_list.append(span_event_end_fmt("SP1", "foo"))
286+
expected_list.append(span_event_end_fmt("SP2", "foo"))
287+
288+
# compare if two lists are the same
289+
self.assertListEqual(spans_calls_list, expected_list)

0 commit comments

Comments
 (0)