Skip to content

sdk/trace: add SpanProcessor #115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 85 additions & 1 deletion opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,68 @@ def from_map(cls, maxlen, mapping):
Link = namedtuple("Link", ("context", "attributes"))


class SpanProcessor:
"""Interface which allows hooks for SDK's `Span`s start and end method
invocations.

Span processors can be registered directly using
:func:`~Tracer:add_span_processor` and they are invoked in the same order
as they were registered.
"""

def on_start(self, span: "Span") -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we mention this is a SDK Span, not a API Span?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can mention this explicitly too, but the "Span" type annotation here should already refer to the Span in this file.

BTW, please move SpanProcessor after Span, so that the type annotation can be written without making it a string.

Copy link
Member Author

@mauriciovasquezbernal mauriciovasquezbernal Aug 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll mention this explicitly in a the SpanProcessor docs.

I had tried to move it there but Span needs _NO_OP_SPAN_PROCESSOR that is an instance of SpanProcessor, so I gave up and preferred to use strings for the annotations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if this is covered somewhere, but why would the SpanProcessor take the SDK Span as an object? Shouldn't SpanProcessors be more agnostic of the span implementation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@toumorokoshi We specify SpanProcessors only in the SDK level, so there can't ever be something different than an SDK span. Also, a plain span would be pretty useless to a span processor since there aren't any properties on it that can be read.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@toumorokoshi as already pointed by @Oberon00 the SpanProcessors are only defined in the SDK level. We discussed in the SIG meeting about passing a readable span (something similar to SpanData) to the SpanProcessors, but we then agreed to pass directly the SDK span for the time being.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And that in the SIG was only about exporters, where this might change in the long term. I imagine SpanProcessors will continue to always just receive a sdk.Span object directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine SpanProcessors will continue to always just receive a sdk.Span object directly.

Correct. It has been mentioned the (future) possibility to set properties on the SDK Span from processor code ;)

"""Called when a :class:`Span` is started.

This method is called synchronously on the thread that starts the
span, therefore it should not block or throw an exception.

Args:
span: The :class:`Span` that just started.
"""

def on_end(self, span: "Span") -> None:
"""Called when a :class:`Span` is ended.

This method is called synchronously on the thread that ends the
span, therefore it should not block or throw an exception.

Args:
span: The :class:`Span` that just ended.
"""

def shutdown(self) -> None:
"""Called when a :class:`Tracer` is shutdown."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have a shutdown method in our tracer yet?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't.



class MultiSpanProcessor(SpanProcessor):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a motivation to have MultiSpanProcessor be a specialized version of SpanProcessor? In general I've thought if an interface probably needs to support adding multiple processors, it would just be better to have the system consuming the SpanProcessor to take multiple?

e.g.

tracer.set_span_processor(Processor1(), Processor2())

rather than

tracer.set_span_processor(MultiSpanProcessor(Processor1(), Processor2())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this use of the Composite Pattern is OK, as it is IMHO a well-understood coding-pattern. Putting that code directly in the span would not be much simpler. Though I agree that it currently is simpler and I don't know if we will ever have more consumers of span processors (though it is imaginable to add processors for before and after sampling stages, for example)

"""Implementation of :class:`SpanProcessor` that forwards all received
events to a list of `SpanProcessor`.
"""

def __init__(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not accept the list of processors in the constructor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this seems nice, I would vote for following the YAGNI-principle here and not add that to the constructor. Users of OpenTelemetry will probably never create (or even directly access) a MultiSpanProcessor themselves.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far there is not an envisioned case where a MultipleSpanProcessor is created from a list of SpanProcessors. Currently they are added one at the single time after the MultipleSpanProcessor instance in the Tracer is been created.

MultiSpanProcessor is internal class, it is just an implementation choice for this problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not make this class public, btw? This could be useful for users too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this internal we can modify it as we want without worry about compatibility with users. Do you have a use case in mind for that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine users wanting to plug multiple processors, simply. Unless this processor gets really complicated, or needs to be super complicated, I see no reason to not expose it.

Copy link
Member Author

@mauriciovasquezbernal mauriciovasquezbernal Sep 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be currently done using the Trace::add_span_processor method:

tracer.add_span_processor(sp1)
tracer.add_span_processor(sp2)
...

If you still think it is worthy to expose it, I'll do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can add multiple processors already, but it would been nice to have this multi processor - in an ideal world, it would be simple and not married to the implementation, so there shouldn't be complicated to expose it.

In any case, lets keep it private for the time being then, unless somebody has objections. We can reconsider making it public later.

# use a tuple to avoid race conditions when adding a new span and
# iterating through it on "on_start" and "on_end".
self._span_processors = ()
self._lock = threading.Lock()

def add_span_processor(self, span_processor: SpanProcessor):
"""Adds a SpanProcessor to the list handled by this instance."""
with self._lock:
self._span_processors = self._span_processors + (span_processor,)

def on_start(self, span: "Span") -> None:
for sp in self._span_processors:
sp.on_start(span)

def on_end(self, span: "Span") -> None:
for sp in self._span_processors:
sp.on_end(span)

def shutdown(self) -> None:
for sp in self._span_processors:
sp.shutdown()


class Span(trace_api.Span):
"""See `opentelemetry.trace.Span`.

Expand All @@ -161,6 +223,8 @@ class Span(trace_api.Span):
attributes: The span's attributes to be exported
events: Timestamped events to be exported
links: Links to other spans to be exported
span_processor: `SpanProcessor` to invoke when starting and ending
this `Span`.
"""

# Initialize these lazily assuming most spans won't have them.
Expand All @@ -179,6 +243,7 @@ def __init__(
attributes: types.Attributes = None, # TODO
events: typing.Sequence[Event] = None, # TODO
links: typing.Sequence[Link] = None, # TODO
span_processor: SpanProcessor = SpanProcessor(),
) -> None:

self.name = name
Expand All @@ -190,6 +255,7 @@ def __init__(
self.attributes = attributes
self.events = events
self.links = links
self.span_processor = span_processor

if attributes is None:
self.attributes = Span.empty_attributes
Expand Down Expand Up @@ -247,10 +313,12 @@ def add_link(
def start(self):
if self.start_time is None:
self.start_time = util.time_ns()
self.span_processor.on_start(self)

def end(self):
if self.end_time is None:
self.end_time = util.time_ns()
self.span_processor.on_end(self)

def update_name(self, name: str) -> None:
self.name = name
Expand Down Expand Up @@ -286,6 +354,7 @@ def __init__(self, name: str = "") -> None:
if name:
slot_name = "{}.current_span".format(name)
self._current_span_slot = Context.register_slot(slot_name)
self._active_span_processor = MultiSpanProcessor()

def get_current_span(self):
"""See `opentelemetry.trace.Tracer.get_current_span`."""
Expand Down Expand Up @@ -325,7 +394,12 @@ def create_span(
parent_context.trace_options,
parent_context.trace_state,
)
return Span(name=name, context=context, parent=parent)
return Span(
name=name,
context=context,
parent=parent,
span_processor=self._active_span_processor,
)

@contextmanager
def use_span(self, span: "Span") -> typing.Iterator["Span"]:
Expand All @@ -339,5 +413,15 @@ def use_span(self, span: "Span") -> typing.Iterator["Span"]:
self._current_span_slot.set(span_snapshot)
span.end()

def add_span_processor(self, span_processor: SpanProcessor) -> None:
"""Registers a new :class:`SpanProcessor` for this `Tracer`.

The span processors are invoked in the same order they are registered.
"""

# no lock here because MultiSpanProcessor.add_span_processor is
# thread safe
self._active_span_processor.add_span_processor(span_processor)


tracer = Tracer()
113 changes: 113 additions & 0 deletions opentelemetry-sdk/tests/trace/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,116 @@ class TestSpan(unittest.TestCase):
def test_basic_span(self):
span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext))
self.assertEqual(span.name, "name")


def span_event_start_fmt(span_processor_name, span_name):
return span_processor_name + ":" + span_name + ":start"


def span_event_end_fmt(span_processor_name, span_name):
return span_processor_name + ":" + span_name + ":end"


class MySpanProcessor(trace.SpanProcessor):
def __init__(self, name, span_list):
self.name = name
self.span_list = span_list

def on_start(self, span: "trace.Span") -> None:
self.span_list.append(span_event_start_fmt(self.name, span.name))

def on_end(self, span: "trace.Span") -> None:
self.span_list.append(span_event_end_fmt(self.name, span.name))


class TestSpanProcessor(unittest.TestCase):
def test_span_processor(self):
tracer = trace.Tracer()

spans_calls_list = [] # filled by MySpanProcessor
expected_list = [] # filled by hand

# Span processors are created but not added to the tracer yet
sp1 = MySpanProcessor("SP1", spans_calls_list)
sp2 = MySpanProcessor("SP2", spans_calls_list)

with tracer.start_span("foo"):
with tracer.start_span("bar"):
with tracer.start_span("baz"):
pass

# at this point lists must be empty
self.assertEqual(len(spans_calls_list), 0)

# add single span processor
tracer.add_span_processor(sp1)

with tracer.start_span("foo"):
expected_list.append(span_event_start_fmt("SP1", "foo"))

with tracer.start_span("bar"):
expected_list.append(span_event_start_fmt("SP1", "bar"))

with tracer.start_span("baz"):
expected_list.append(span_event_start_fmt("SP1", "baz"))

expected_list.append(span_event_end_fmt("SP1", "baz"))

expected_list.append(span_event_end_fmt("SP1", "bar"))

expected_list.append(span_event_end_fmt("SP1", "foo"))

self.assertListEqual(spans_calls_list, expected_list)

spans_calls_list.clear()
expected_list.clear()

# go for multiple span processors
tracer.add_span_processor(sp2)

with tracer.start_span("foo"):
expected_list.append(span_event_start_fmt("SP1", "foo"))
expected_list.append(span_event_start_fmt("SP2", "foo"))

with tracer.start_span("bar"):
expected_list.append(span_event_start_fmt("SP1", "bar"))
expected_list.append(span_event_start_fmt("SP2", "bar"))

with tracer.start_span("baz"):
expected_list.append(span_event_start_fmt("SP1", "baz"))
expected_list.append(span_event_start_fmt("SP2", "baz"))

expected_list.append(span_event_end_fmt("SP1", "baz"))
expected_list.append(span_event_end_fmt("SP2", "baz"))

expected_list.append(span_event_end_fmt("SP1", "bar"))
expected_list.append(span_event_end_fmt("SP2", "bar"))

expected_list.append(span_event_end_fmt("SP1", "foo"))
expected_list.append(span_event_end_fmt("SP2", "foo"))

# compare if two lists are the same
self.assertListEqual(spans_calls_list, expected_list)

def test_add_span_processor_after_span_creation(self):
tracer = trace.Tracer()

spans_calls_list = [] # filled by MySpanProcessor
expected_list = [] # filled by hand

# Span processors are created but not added to the tracer yet
sp = MySpanProcessor("SP1", spans_calls_list)

with tracer.start_span("foo"):
with tracer.start_span("bar"):
with tracer.start_span("baz"):
# add span processor after spans have been created
tracer.add_span_processor(sp)

expected_list.append(span_event_end_fmt("SP1", "baz"))

expected_list.append(span_event_end_fmt("SP1", "bar"))

expected_list.append(span_event_end_fmt("SP1", "foo"))

self.assertListEqual(spans_calls_list, expected_list)