Skip to content

Commit 618fc7a

Browse files
sdk/trace: add SpanProcessor
SpanProcessor is an interface that allows to register hooks for Span start and end invocations. This commit adds the SpanProcessor interface to the SDK as well as the MultiSpanProcessor that allows to register multiple processors.
1 parent 12415f9 commit 618fc7a

File tree

2 files changed

+134
-1
lines changed

2 files changed

+134
-1
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,61 @@ def from_map(cls, maxlen, mapping):
151151
Link = namedtuple('Link', ('context', 'attributes'))
152152

153153

154+
class SpanProcessor():
155+
def on_start(self, span: 'Span') -> None:
156+
"""Called when a :class:`Span` is started.
157+
158+
This method is called synchronously on the execution thread, therefore
159+
it should not block or throw an exception.
160+
161+
Args:
162+
span: The :class:`Span` that just started.
163+
"""
164+
165+
def on_end(self, span: 'Span') -> None:
166+
"""Called when a :class:`Span` is ended.
167+
168+
This method is called synchronously on the execution thread, therefore
169+
it should not block or throw an exception.
170+
171+
Args:
172+
span: The :class:`Span` that just ended.
173+
"""
174+
175+
def shutdown(self) -> None:
176+
"""Called when a :class:`Tracer` is shutdown."""
177+
178+
179+
# Used when no SpanProcessor has been added to the Tracer
180+
_NO_OP_SPAN_PROCESSOR = SpanProcessor()
181+
182+
183+
class MultiSpanProcessor(SpanProcessor):
184+
"""Implementation of :class:`SpanProcessor` that forwards all received
185+
events to a list of `SpanProcessor`.
186+
"""
187+
def __init__(self):
188+
# use a tuple to avoid race conditions when adding a new span and
189+
# interating throw it on "on_start" and "on_end".
190+
self._span_processors = ()
191+
192+
def add_span_processor(self, span_processor: SpanProcessor):
193+
"""Adds a SpanProcessor to the list handled by this instance."""
194+
self._span_processors = self._span_processors + (span_processor, )
195+
196+
def on_start(self, span: 'Span') -> None:
197+
for sp in self._span_processors:
198+
sp.on_start(span)
199+
200+
def on_end(self, span: 'Span') -> None:
201+
for sp in self._span_processors:
202+
sp.on_end(span)
203+
204+
def on_shutdown(self) -> None:
205+
for sp in self._span_processors:
206+
sp.shutdown()
207+
208+
154209
class Span(trace_api.Span):
155210
"""See `opentelemetry.trace.Span`.
156211
@@ -167,6 +222,8 @@ class Span(trace_api.Span):
167222
attributes: The span's attributes to be exported
168223
events: Timestamped events to be exported
169224
links: Links to other spans to be exported
225+
span_processor: `SpanProcessor` to invoke when starting and ending
226+
this `Span`.
170227
"""
171228

172229
# Initialize these lazily assuming most spans won't have them.
@@ -184,6 +241,7 @@ def __init__(self: 'Span',
184241
attributes: types.Attributes = None, # TODO
185242
events: typing.Sequence[Event] = None, # TODO
186243
links: typing.Sequence[Link] = None, # TODO
244+
span_processor: SpanProcessor = _NO_OP_SPAN_PROCESSOR
187245
) -> None:
188246

189247
self.name = name
@@ -195,6 +253,7 @@ def __init__(self: 'Span',
195253
self.attributes = attributes
196254
self.events = events
197255
self.links = links
256+
self.span_processor = span_processor
198257

199258
if attributes is None:
200259
self.attributes = Span.empty_attributes
@@ -256,10 +315,12 @@ def add_link(self: 'Span',
256315
def start(self):
257316
if self.start_time is None:
258317
self.start_time = util.time_ns()
318+
self.span_processor.on_start(self)
259319

260320
def end(self):
261321
if self.end_time is None:
262322
self.end_time = util.time_ns()
323+
self.span_processor.on_end(self)
263324

264325
def update_name(self, name: str) -> None:
265326
self.name = name
@@ -297,6 +358,8 @@ def __init__(self,
297358
if name:
298359
slot_name = '{}.current_span'.format(name)
299360
self._current_span_slot = Context.register_slot(slot_name)
361+
self._active_span_processor = _NO_OP_SPAN_PROCESSOR
362+
self._lock = threading.Lock()
300363

301364
def get_current_span(self):
302365
"""See `opentelemetry.trace.Tracer.get_current_span`."""
@@ -334,7 +397,8 @@ def create_span(self,
334397
span_id,
335398
parent_context.trace_options,
336399
parent_context.trace_state)
337-
return Span(name=name, context=context, parent=parent)
400+
return Span(name=name, context=context, parent=parent,
401+
span_processor=self._active_span_processor)
338402

339403
@contextmanager
340404
def use_span(self, span: 'Span') -> typing.Iterator['Span']:
@@ -348,5 +412,15 @@ def use_span(self, span: 'Span') -> typing.Iterator['Span']:
348412
self._current_span_slot.set(span_snapshot)
349413
span.end()
350414

415+
def add_span_processor(self, span_processor: SpanProcessor) -> None:
416+
"""Registers a new :class:`SpanProcessor` for this `Tracer`.
417+
418+
The span processors are invoked in the same order they are registered.
419+
"""
420+
with self._lock:
421+
if self._active_span_processor is _NO_OP_SPAN_PROCESSOR:
422+
self._active_span_processor = MultiSpanProcessor()
423+
self._active_span_processor.add_span_processor(span_processor)
424+
351425

352426
tracer = Tracer()

opentelemetry-sdk/tests/trace/test_trace.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,62 @@ class TestSpan(unittest.TestCase):
187187
def test_basic_span(self):
188188
span = trace.Span('name', mock.Mock(spec=trace_api.SpanContext))
189189
self.assertEqual(span.name, 'name')
190+
191+
192+
class TestSpanProcessor(unittest.TestCase):
193+
194+
def test_span_processor(self):
195+
196+
def span_event_start_fmt(span_processor_name, span_name):
197+
return span_processor_name + ':' + span_name + ':start'
198+
199+
def span_event_end_fmt(span_processor_name, span_name):
200+
return span_processor_name + ':' + span_name + ':end'
201+
202+
class MySpanProcessor(trace.SpanProcessor):
203+
def __init__(self, name, span_list):
204+
self.name = name
205+
self.span_list = span_list
206+
207+
def on_start(self, span: 'trace.Span') -> None:
208+
self.span_list.append(
209+
span_event_start_fmt(self.name, span.name))
210+
211+
def on_end(self, span: 'trace.Span') -> None:
212+
self.span_list.append(
213+
span_event_end_fmt(self.name, span.name))
214+
215+
tracer = trace.Tracer()
216+
217+
spans_calls_list = [] # filled by MySpanProcessor
218+
expected_list = [] # filled by hand
219+
220+
sp1 = MySpanProcessor("SP1", spans_calls_list)
221+
tracer.add_span_processor(sp1)
222+
223+
sp2 = MySpanProcessor("SP2", spans_calls_list)
224+
tracer.add_span_processor(sp2)
225+
226+
with tracer.start_span('foo'):
227+
expected_list.append(span_event_start_fmt('SP1', 'foo'))
228+
expected_list.append(span_event_start_fmt('SP2', 'foo'))
229+
230+
with tracer.start_span('bar'):
231+
expected_list.append(span_event_start_fmt('SP1', 'bar'))
232+
expected_list.append(span_event_start_fmt('SP2', 'bar'))
233+
234+
with tracer.start_span('baz'):
235+
expected_list.append(span_event_start_fmt('SP1', 'baz'))
236+
expected_list.append(span_event_start_fmt('SP2', 'baz'))
237+
238+
expected_list.append(span_event_end_fmt('SP1', 'baz'))
239+
expected_list.append(span_event_end_fmt('SP2', 'baz'))
240+
241+
expected_list.append(span_event_end_fmt('SP1', 'bar'))
242+
expected_list.append(span_event_end_fmt('SP2', 'bar'))
243+
244+
expected_list.append(span_event_end_fmt('SP1', 'foo'))
245+
expected_list.append(span_event_end_fmt('SP2', 'foo'))
246+
247+
# compare if two lists are the same
248+
self.assertListEqual(spans_calls_list, expected_list)

0 commit comments

Comments
 (0)