Skip to content

Update BatchSpanProcessor to use new BatchProcessor class #4580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- typecheck: add sdk/resources and drop mypy
([#4578](https://github.com/open-telemetry/opentelemetry-python/pull/4578))
- Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more
clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/)
and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)).
- Refactor `BatchLogRecordProcessor` and `BatchSpanProcessor` to simplify code
and make the control flow more clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/)
[#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535), and
[#4580](https://github.com/open-telemetry/opentelemetry-python/pull/4580)).

## Version 1.33.0/0.54b0 (2025-05-09)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def emit(self, log_data: LogData) -> None:
def shutdown(self):
return self._batch_processor.shutdown()

def force_flush(self, timeout_millis: Optional[int] = None):
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
return self._batch_processor.force_flush(timeout_millis)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class BatchExportStrategy(enum.Enum):

class Exporter(Protocol[Telemetry]):
@abstractmethod
def export(self, batch: list[Telemetry]):
def export(self, batch: list[Telemetry], /):
raise NotImplementedError

@abstractmethod
Expand Down Expand Up @@ -191,8 +191,10 @@ def shutdown(self):
self._worker_thread.join()
self._exporter.shutdown()

def force_flush(self, timeout_millis: Optional[int] = None):
# TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568.
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
if self._shutdown:
return
return False
# Blocking call to export.
self._export(BatchExportStrategy.EXPORT_ALL)
return True
244 changes: 16 additions & 228 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,11 @@
# limitations under the License.
from __future__ import annotations

import collections
import logging
import os
import sys
import threading
import typing
import weakref
from enum import Enum
from os import environ, linesep
from time import time_ns

from opentelemetry.context import (
_SUPPRESS_INSTRUMENTATION_KEY,
Expand All @@ -31,14 +26,14 @@
detach,
set_value,
)
from opentelemetry.sdk._shared_internal import BatchProcessor
from opentelemetry.sdk.environment_variables import (
OTEL_BSP_EXPORT_TIMEOUT,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
OTEL_BSP_MAX_QUEUE_SIZE,
OTEL_BSP_SCHEDULE_DELAY,
)
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.util._once import Once

_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
Expand Down Expand Up @@ -125,19 +120,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
return True


class _FlushRequest:
"""Represents a request for the BatchSpanProcessor to flush spans."""

__slots__ = ["event", "num_spans"]

def __init__(self):
self.event = threading.Event()
self.num_spans = 0


_BSP_RESET_ONCE = Once()


class BatchSpanProcessor(SpanProcessor):
"""Batch span processor implementation.

Expand All @@ -151,6 +133,8 @@ class BatchSpanProcessor(SpanProcessor):
- :envvar:`OTEL_BSP_MAX_QUEUE_SIZE`
- :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
- :envvar:`OTEL_BSP_EXPORT_TIMEOUT`

All the logic for emitting spans, shutting down etc. resides in the `BatchProcessor` class.
"""

def __init__(
Expand All @@ -174,6 +158,7 @@ def __init__(
BatchSpanProcessor._default_max_export_batch_size()
)

# Not used. No way currently to pass timeout to export.
if export_timeout_millis is None:
export_timeout_millis = (
BatchSpanProcessor._default_export_timeout_millis()
Expand All @@ -183,227 +168,30 @@ def __init__(
max_queue_size, schedule_delay_millis, max_export_batch_size
)

self.span_exporter = span_exporter
self.queue = collections.deque([], max_queue_size) # type: typing.Deque[Span]
self.worker_thread = threading.Thread(
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
self._batch_processor = BatchProcessor(
span_exporter,
schedule_delay_millis,
max_export_batch_size,
export_timeout_millis,
max_queue_size,
"Span",
)
self.condition = threading.Condition(threading.Lock())
self._flush_request = None # type: typing.Optional[_FlushRequest]
self.schedule_delay_millis = schedule_delay_millis
self.max_export_batch_size = max_export_batch_size
self.max_queue_size = max_queue_size
self.export_timeout_millis = export_timeout_millis
self.done = False
# flag that indicates that spans are being dropped
self._spans_dropped = False
# precallocated list to send spans to exporter
self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
self.worker_thread.start()
if hasattr(os, "register_at_fork"):
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
self._pid = os.getpid()

def on_start(
self, span: Span, parent_context: Context | None = None
) -> None:
pass

def on_end(self, span: ReadableSpan) -> None:
if self.done:
logger.warning("Already shutdown, dropping span.")
return
if not span.context.trace_flags.sampled:
return
if self._pid != os.getpid():
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)

if len(self.queue) == self.max_queue_size:
if not self._spans_dropped:
logger.warning("Queue is full, likely spans will be dropped.")
self._spans_dropped = True

self.queue.appendleft(span)

if len(self.queue) >= self.max_export_batch_size:
with self.condition:
self.condition.notify()

def _at_fork_reinit(self):
self.condition = threading.Condition(threading.Lock())
self.queue.clear()

# worker_thread is local to a process, only the thread that issued fork continues
# to exist. A new worker thread must be started in child process.
self.worker_thread = threading.Thread(
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
)
self.worker_thread.start()
self._pid = os.getpid()

def worker(self):
timeout = self.schedule_delay_millis / 1e3
flush_request = None # type: typing.Optional[_FlushRequest]
while not self.done:
with self.condition:
if self.done:
# done flag may have changed, avoid waiting
break
flush_request = self._get_and_unset_flush_request()
if (
len(self.queue) < self.max_export_batch_size
and flush_request is None
):
self.condition.wait(timeout)
flush_request = self._get_and_unset_flush_request()
if not self.queue:
# spurious notification, let's wait again, reset timeout
timeout = self.schedule_delay_millis / 1e3
self._notify_flush_request_finished(flush_request)
flush_request = None
continue
if self.done:
# missing spans will be sent when calling flush
break

# subtract the duration of this export call to the next timeout
start = time_ns()
self._export(flush_request)
end = time_ns()
duration = (end - start) / 1e9
timeout = self.schedule_delay_millis / 1e3 - duration

self._notify_flush_request_finished(flush_request)
flush_request = None

# there might have been a new flush request while export was running
# and before the done flag switched to true
with self.condition:
shutdown_flush_request = self._get_and_unset_flush_request()

# be sure that all spans are sent
self._drain_queue()
self._notify_flush_request_finished(flush_request)
self._notify_flush_request_finished(shutdown_flush_request)

def _get_and_unset_flush_request(
self,
) -> typing.Optional[_FlushRequest]:
"""Returns the current flush request and makes it invisible to the
worker thread for subsequent calls.
"""
flush_request = self._flush_request
self._flush_request = None
if flush_request is not None:
flush_request.num_spans = len(self.queue)
return flush_request

@staticmethod
def _notify_flush_request_finished(
flush_request: typing.Optional[_FlushRequest],
):
"""Notifies the flush initiator(s) waiting on the given request/event
that the flush operation was finished.
"""
if flush_request is not None:
flush_request.event.set()

def _get_or_create_flush_request(self) -> _FlushRequest:
"""Either returns the current active flush event or creates a new one.
self._batch_processor.emit(span)

The flush event will be visible and read by the worker thread before an
export operation starts. Callers of a flush operation may wait on the
returned event to be notified when the flush/export operation was
finished.
def shutdown(self):
return self._batch_processor.shutdown()

This method is not thread-safe, i.e. callers need to take care about
synchronization/locking.
"""
if self._flush_request is None:
self._flush_request = _FlushRequest()
return self._flush_request

def _export(self, flush_request: typing.Optional[_FlushRequest]):
"""Exports spans considering the given flush_request.

In case of a given flush_requests spans are exported in batches until
the number of exported spans reached or exceeded the number of spans in
the flush request.
In no flush_request was given at most max_export_batch_size spans are
exported.
"""
if not flush_request:
self._export_batch()
return

num_spans = flush_request.num_spans
while self.queue:
num_exported = self._export_batch()
num_spans -= num_exported

if num_spans <= 0:
break

def _export_batch(self) -> int:
"""Exports at most max_export_batch_size spans and returns the number of
exported spans.
"""
idx = 0
# currently only a single thread acts as consumer, so queue.pop() will
# not raise an exception
while idx < self.max_export_batch_size and self.queue:
self.spans_list[idx] = self.queue.pop()
idx += 1
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
# Ignore type b/c the Optional[None]+slicing is too "clever"
# for mypy
self.span_exporter.export(self.spans_list[:idx]) # type: ignore
except Exception: # pylint: disable=broad-exception-caught
logger.exception("Exception while exporting Span batch.")
detach(token)

# clean up list
for index in range(idx):
self.spans_list[index] = None
return idx

def _drain_queue(self):
"""Export all elements until queue is empty.

Can only be called from the worker thread context because it invokes
`export` that is not thread safe.
"""
while self.queue:
self._export_batch()

def force_flush(self, timeout_millis: int | None = None) -> bool:
if timeout_millis is None:
timeout_millis = self.export_timeout_millis

if self.done:
logger.warning("Already shutdown, ignoring call to force_flush().")
return True

with self.condition:
flush_request = self._get_or_create_flush_request()
# signal the worker thread to flush and wait for it to finish
self.condition.notify_all()

# wait for token to be processed
ret = flush_request.event.wait(timeout_millis / 1e3)
if not ret:
logger.warning("Timeout was exceeded in force_flush().")
return ret

def shutdown(self) -> None:
# signal the worker thread to finish and then wait for it
self.done = True
with self.condition:
self.condition.notify_all()
self.worker_thread.join()
self.span_exporter.shutdown()
def force_flush(self, timeout_millis: typing.Optional[int] = None) -> bool:
return self._batch_processor.force_flush(timeout_millis)

@staticmethod
def _default_max_queue_size():
Expand Down
Loading