diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index 72c5c303469..21acb394945 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -145,6 +145,68 @@ def from_map(cls, maxlen, mapping): Link = namedtuple("Link", ("context", "attributes")) +class SpanProcessor: + """Interface which allows hooks for SDK's `Span`s start and end method + invocations. + + Span processors can be registered directly using + :func:`~Tracer:add_span_processor` and they are invoked in the same order + as they were registered. + """ + + def on_start(self, span: "Span") -> None: + """Called when a :class:`Span` is started. + + This method is called synchronously on the thread that starts the + span, therefore it should not block or throw an exception. + + Args: + span: The :class:`Span` that just started. + """ + + def on_end(self, span: "Span") -> None: + """Called when a :class:`Span` is ended. + + This method is called synchronously on the thread that ends the + span, therefore it should not block or throw an exception. + + Args: + span: The :class:`Span` that just ended. + """ + + def shutdown(self) -> None: + """Called when a :class:`Tracer` is shutdown.""" + + +class MultiSpanProcessor(SpanProcessor): + """Implementation of :class:`SpanProcessor` that forwards all received + events to a list of `SpanProcessor`. + """ + + def __init__(self): + # use a tuple to avoid race conditions when adding a new span and + # iterating through it on "on_start" and "on_end". + self._span_processors = () + self._lock = threading.Lock() + + def add_span_processor(self, span_processor: SpanProcessor): + """Adds a SpanProcessor to the list handled by this instance.""" + with self._lock: + self._span_processors = self._span_processors + (span_processor,) + + def on_start(self, span: "Span") -> None: + for sp in self._span_processors: + sp.on_start(span) + + def on_end(self, span: "Span") -> None: + for sp in self._span_processors: + sp.on_end(span) + + def shutdown(self) -> None: + for sp in self._span_processors: + sp.shutdown() + + class Span(trace_api.Span): """See `opentelemetry.trace.Span`. @@ -161,6 +223,8 @@ class Span(trace_api.Span): attributes: The span's attributes to be exported events: Timestamped events to be exported links: Links to other spans to be exported + span_processor: `SpanProcessor` to invoke when starting and ending + this `Span`. """ # Initialize these lazily assuming most spans won't have them. @@ -179,6 +243,7 @@ def __init__( attributes: types.Attributes = None, # TODO events: typing.Sequence[Event] = None, # TODO links: typing.Sequence[Link] = None, # TODO + span_processor: SpanProcessor = SpanProcessor(), ) -> None: self.name = name @@ -190,6 +255,7 @@ def __init__( self.attributes = attributes self.events = events self.links = links + self.span_processor = span_processor if attributes is None: self.attributes = Span.empty_attributes @@ -247,10 +313,12 @@ def add_link( def start(self): if self.start_time is None: self.start_time = util.time_ns() + self.span_processor.on_start(self) def end(self): if self.end_time is None: self.end_time = util.time_ns() + self.span_processor.on_end(self) def update_name(self, name: str) -> None: self.name = name @@ -286,6 +354,7 @@ def __init__(self, name: str = "") -> None: if name: slot_name = "{}.current_span".format(name) self._current_span_slot = Context.register_slot(slot_name) + self._active_span_processor = MultiSpanProcessor() def get_current_span(self): """See `opentelemetry.trace.Tracer.get_current_span`.""" @@ -325,7 +394,12 @@ def create_span( parent_context.trace_options, parent_context.trace_state, ) - return Span(name=name, context=context, parent=parent) + return Span( + name=name, + context=context, + parent=parent, + span_processor=self._active_span_processor, + ) @contextmanager def use_span(self, span: "Span") -> typing.Iterator["Span"]: @@ -339,5 +413,15 @@ def use_span(self, span: "Span") -> typing.Iterator["Span"]: self._current_span_slot.set(span_snapshot) span.end() + def add_span_processor(self, span_processor: SpanProcessor) -> None: + """Registers a new :class:`SpanProcessor` for this `Tracer`. + + The span processors are invoked in the same order they are registered. + """ + + # no lock here because MultiSpanProcessor.add_span_processor is + # thread safe + self._active_span_processor.add_span_processor(span_processor) + tracer = Tracer() diff --git a/opentelemetry-sdk/tests/trace/test_trace.py b/opentelemetry-sdk/tests/trace/test_trace.py index 240972344c8..c0a0e65008d 100644 --- a/opentelemetry-sdk/tests/trace/test_trace.py +++ b/opentelemetry-sdk/tests/trace/test_trace.py @@ -198,3 +198,116 @@ class TestSpan(unittest.TestCase): def test_basic_span(self): span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext)) self.assertEqual(span.name, "name") + + +def span_event_start_fmt(span_processor_name, span_name): + return span_processor_name + ":" + span_name + ":start" + + +def span_event_end_fmt(span_processor_name, span_name): + return span_processor_name + ":" + span_name + ":end" + + +class MySpanProcessor(trace.SpanProcessor): + def __init__(self, name, span_list): + self.name = name + self.span_list = span_list + + def on_start(self, span: "trace.Span") -> None: + self.span_list.append(span_event_start_fmt(self.name, span.name)) + + def on_end(self, span: "trace.Span") -> None: + self.span_list.append(span_event_end_fmt(self.name, span.name)) + + +class TestSpanProcessor(unittest.TestCase): + def test_span_processor(self): + tracer = trace.Tracer() + + spans_calls_list = [] # filled by MySpanProcessor + expected_list = [] # filled by hand + + # Span processors are created but not added to the tracer yet + sp1 = MySpanProcessor("SP1", spans_calls_list) + sp2 = MySpanProcessor("SP2", spans_calls_list) + + with tracer.start_span("foo"): + with tracer.start_span("bar"): + with tracer.start_span("baz"): + pass + + # at this point lists must be empty + self.assertEqual(len(spans_calls_list), 0) + + # add single span processor + tracer.add_span_processor(sp1) + + with tracer.start_span("foo"): + expected_list.append(span_event_start_fmt("SP1", "foo")) + + with tracer.start_span("bar"): + expected_list.append(span_event_start_fmt("SP1", "bar")) + + with tracer.start_span("baz"): + expected_list.append(span_event_start_fmt("SP1", "baz")) + + expected_list.append(span_event_end_fmt("SP1", "baz")) + + expected_list.append(span_event_end_fmt("SP1", "bar")) + + expected_list.append(span_event_end_fmt("SP1", "foo")) + + self.assertListEqual(spans_calls_list, expected_list) + + spans_calls_list.clear() + expected_list.clear() + + # go for multiple span processors + tracer.add_span_processor(sp2) + + with tracer.start_span("foo"): + expected_list.append(span_event_start_fmt("SP1", "foo")) + expected_list.append(span_event_start_fmt("SP2", "foo")) + + with tracer.start_span("bar"): + expected_list.append(span_event_start_fmt("SP1", "bar")) + expected_list.append(span_event_start_fmt("SP2", "bar")) + + with tracer.start_span("baz"): + expected_list.append(span_event_start_fmt("SP1", "baz")) + expected_list.append(span_event_start_fmt("SP2", "baz")) + + expected_list.append(span_event_end_fmt("SP1", "baz")) + expected_list.append(span_event_end_fmt("SP2", "baz")) + + expected_list.append(span_event_end_fmt("SP1", "bar")) + expected_list.append(span_event_end_fmt("SP2", "bar")) + + expected_list.append(span_event_end_fmt("SP1", "foo")) + expected_list.append(span_event_end_fmt("SP2", "foo")) + + # compare if two lists are the same + self.assertListEqual(spans_calls_list, expected_list) + + def test_add_span_processor_after_span_creation(self): + tracer = trace.Tracer() + + spans_calls_list = [] # filled by MySpanProcessor + expected_list = [] # filled by hand + + # Span processors are created but not added to the tracer yet + sp = MySpanProcessor("SP1", spans_calls_list) + + with tracer.start_span("foo"): + with tracer.start_span("bar"): + with tracer.start_span("baz"): + # add span processor after spans have been created + tracer.add_span_processor(sp) + + expected_list.append(span_event_end_fmt("SP1", "baz")) + + expected_list.append(span_event_end_fmt("SP1", "bar")) + + expected_list.append(span_event_end_fmt("SP1", "foo")) + + self.assertListEqual(spans_calls_list, expected_list)