Skip to content

Commit 974a148

Browse files
sdk/trace: add SpanProcessor
SpanProcessor is an interface that allows to register hooks for Span start and end invocations. This commit adds the SpanProcessor interface to the SDK as well as the MultiSpanProcessor that allows to register multiple processors.
1 parent 9aba630 commit 974a148

File tree

2 files changed

+137
-1
lines changed

2 files changed

+137
-1
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,62 @@ def from_map(cls, maxlen, mapping):
145145
Link = namedtuple("Link", ("context", "attributes"))
146146

147147

148+
class SpanProcessor:
149+
def on_start(self, span: "Span") -> None:
150+
"""Called when a :class:`Span` is started.
151+
152+
This method is called synchronously on the execution thread, therefore
153+
it should not block or throw an exception.
154+
155+
Args:
156+
span: The :class:`Span` that just started.
157+
"""
158+
159+
def on_end(self, span: "Span") -> None:
160+
"""Called when a :class:`Span` is ended.
161+
162+
This method is called synchronously on the execution thread, therefore
163+
it should not block or throw an exception.
164+
165+
Args:
166+
span: The :class:`Span` that just ended.
167+
"""
168+
169+
def shutdown(self) -> None:
170+
"""Called when a :class:`Tracer` is shutdown."""
171+
172+
173+
# Used when no SpanProcessor has been added to the Tracer
174+
_NO_OP_SPAN_PROCESSOR = SpanProcessor()
175+
176+
177+
class MultiSpanProcessor(SpanProcessor):
178+
"""Implementation of :class:`SpanProcessor` that forwards all received
179+
events to a list of `SpanProcessor`.
180+
"""
181+
182+
def __init__(self):
183+
# use a tuple to avoid race conditions when adding a new span and
184+
# interating throw it on "on_start" and "on_end".
185+
self._span_processors = ()
186+
187+
def add_span_processor(self, span_processor: SpanProcessor):
188+
"""Adds a SpanProcessor to the list handled by this instance."""
189+
self._span_processors = self._span_processors + (span_processor,)
190+
191+
def on_start(self, span: "Span") -> None:
192+
for sp in self._span_processors:
193+
sp.on_start(span)
194+
195+
def on_end(self, span: "Span") -> None:
196+
for sp in self._span_processors:
197+
sp.on_end(span)
198+
199+
def on_shutdown(self) -> None:
200+
for sp in self._span_processors:
201+
sp.shutdown()
202+
203+
148204
class Span(trace_api.Span):
149205
"""See `opentelemetry.trace.Span`.
150206
@@ -161,6 +217,8 @@ class Span(trace_api.Span):
161217
attributes: The span's attributes to be exported
162218
events: Timestamped events to be exported
163219
links: Links to other spans to be exported
220+
span_processor: `SpanProcessor` to invoke when starting and ending
221+
this `Span`.
164222
"""
165223

166224
# Initialize these lazily assuming most spans won't have them.
@@ -179,6 +237,7 @@ def __init__(
179237
attributes: types.Attributes = None, # TODO
180238
events: typing.Sequence[Event] = None, # TODO
181239
links: typing.Sequence[Link] = None, # TODO
240+
span_processor: SpanProcessor = _NO_OP_SPAN_PROCESSOR,
182241
) -> None:
183242

184243
self.name = name
@@ -190,6 +249,7 @@ def __init__(
190249
self.attributes = attributes
191250
self.events = events
192251
self.links = links
252+
self.span_processor = span_processor
193253

194254
if attributes is None:
195255
self.attributes = Span.empty_attributes
@@ -247,10 +307,12 @@ def add_link(
247307
def start(self):
248308
if self.start_time is None:
249309
self.start_time = util.time_ns()
310+
self.span_processor.on_start(self)
250311

251312
def end(self):
252313
if self.end_time is None:
253314
self.end_time = util.time_ns()
315+
self.span_processor.on_end(self)
254316

255317
def update_name(self, name: str) -> None:
256318
self.name = name
@@ -286,6 +348,8 @@ def __init__(self, name: str = "") -> None:
286348
if name:
287349
slot_name = "{}.current_span".format(name)
288350
self._current_span_slot = Context.register_slot(slot_name)
351+
self._active_span_processor = _NO_OP_SPAN_PROCESSOR
352+
self._lock = threading.Lock()
289353

290354
def get_current_span(self):
291355
"""See `opentelemetry.trace.Tracer.get_current_span`."""
@@ -325,7 +389,12 @@ def create_span(
325389
parent_context.trace_options,
326390
parent_context.trace_state,
327391
)
328-
return Span(name=name, context=context, parent=parent)
392+
return Span(
393+
name=name,
394+
context=context,
395+
parent=parent,
396+
span_processor=self._active_span_processor,
397+
)
329398

330399
@contextmanager
331400
def use_span(self, span: "Span") -> typing.Iterator["Span"]:
@@ -339,5 +408,15 @@ def use_span(self, span: "Span") -> typing.Iterator["Span"]:
339408
self._current_span_slot.set(span_snapshot)
340409
span.end()
341410

411+
def add_span_processor(self, span_processor: SpanProcessor) -> None:
412+
"""Registers a new :class:`SpanProcessor` for this `Tracer`.
413+
414+
The span processors are invoked in the same order they are registered.
415+
"""
416+
with self._lock:
417+
if self._active_span_processor is _NO_OP_SPAN_PROCESSOR:
418+
self._active_span_processor = MultiSpanProcessor()
419+
self._active_span_processor.add_span_processor(span_processor)
420+
342421

343422
tracer = Tracer()

opentelemetry-sdk/tests/trace/test_trace.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,3 +198,60 @@ class TestSpan(unittest.TestCase):
198198
def test_basic_span(self):
199199
span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext))
200200
self.assertEqual(span.name, "name")
201+
202+
203+
class TestSpanProcessor(unittest.TestCase):
204+
def test_span_processor(self):
205+
def span_event_start_fmt(span_processor_name, span_name):
206+
return span_processor_name + ":" + span_name + ":start"
207+
208+
def span_event_end_fmt(span_processor_name, span_name):
209+
return span_processor_name + ":" + span_name + ":end"
210+
211+
class MySpanProcessor(trace.SpanProcessor):
212+
def __init__(self, name, span_list):
213+
self.name = name
214+
self.span_list = span_list
215+
216+
def on_start(self, span: "trace.Span") -> None:
217+
self.span_list.append(
218+
span_event_start_fmt(self.name, span.name)
219+
)
220+
221+
def on_end(self, span: "trace.Span") -> None:
222+
self.span_list.append(span_event_end_fmt(self.name, span.name))
223+
224+
tracer = trace.Tracer()
225+
226+
spans_calls_list = [] # filled by MySpanProcessor
227+
expected_list = [] # filled by hand
228+
229+
sp1 = MySpanProcessor("SP1", spans_calls_list)
230+
tracer.add_span_processor(sp1)
231+
232+
sp2 = MySpanProcessor("SP2", spans_calls_list)
233+
tracer.add_span_processor(sp2)
234+
235+
with tracer.start_span("foo"):
236+
expected_list.append(span_event_start_fmt("SP1", "foo"))
237+
expected_list.append(span_event_start_fmt("SP2", "foo"))
238+
239+
with tracer.start_span("bar"):
240+
expected_list.append(span_event_start_fmt("SP1", "bar"))
241+
expected_list.append(span_event_start_fmt("SP2", "bar"))
242+
243+
with tracer.start_span("baz"):
244+
expected_list.append(span_event_start_fmt("SP1", "baz"))
245+
expected_list.append(span_event_start_fmt("SP2", "baz"))
246+
247+
expected_list.append(span_event_end_fmt("SP1", "baz"))
248+
expected_list.append(span_event_end_fmt("SP2", "baz"))
249+
250+
expected_list.append(span_event_end_fmt("SP1", "bar"))
251+
expected_list.append(span_event_end_fmt("SP2", "bar"))
252+
253+
expected_list.append(span_event_end_fmt("SP1", "foo"))
254+
expected_list.append(span_event_end_fmt("SP2", "foo"))
255+
256+
# compare if two lists are the same
257+
self.assertListEqual(spans_calls_list, expected_list)

0 commit comments

Comments
 (0)