Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add experimental consistent samplers
([#4714](https://github.com/open-telemetry/opentelemetry-python/pull/4714))

## Version 1.36.0/0.57b0 (2025-07-29)

- Add missing Prometheus exporter documentation
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
__all__ = [
"ComposableSampler",
"ConsistentSampler",
"SamplingIntent",
"consistent_always_off",
"consistent_always_on",
"consistent_parent_based",
"consistent_probability_based",
]


from ._always_off import consistent_always_off
from ._always_on import consistent_always_on
from ._composable import ComposableSampler, SamplingIntent
from ._fixed_threshold import consistent_probability_based
from ._parent_based import consistent_parent_based
from ._sampler import ConsistentSampler
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Optional, Sequence

from opentelemetry.context import Context
from opentelemetry.trace import Link, SpanKind, TraceState
from opentelemetry.util.types import Attributes

from ._composable import ComposableSampler, SamplingIntent
from ._sampler import ConsistentSampler
from ._util import INVALID_THRESHOLD

_intent = SamplingIntent(
threshold=INVALID_THRESHOLD, adjusted_count_reliable=False
)


class ConsistentAlwaysOffSampler(ComposableSampler):
def sampling_intent(
self,
parent_ctx: Optional[Context],
name: str,
span_kind: Optional[SpanKind],
attributes: Attributes,
links: Optional[Sequence[Link]],
trace_state: Optional[TraceState] = None,
) -> SamplingIntent:
return _intent

def get_description(self) -> str:
return "ConsistentAlwaysOffSampler"


_always_off = ConsistentSampler(ConsistentAlwaysOffSampler())


def consistent_always_off() -> ConsistentSampler:
"""Returns a consistent sampler that does not sample any span."""
return _always_off
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Optional, Sequence

from opentelemetry.context import Context
from opentelemetry.trace import Link, SpanKind, TraceState
from opentelemetry.util.types import Attributes

from ._composable import ComposableSampler, SamplingIntent
from ._sampler import ConsistentSampler
from ._util import MIN_THRESHOLD

_intent = SamplingIntent(threshold=MIN_THRESHOLD)


class ConsistentAlwaysOnSampler(ComposableSampler):
def sampling_intent(
self,
parent_ctx: Optional[Context],
name: str,
span_kind: Optional[SpanKind],
attributes: Attributes,
links: Optional[Sequence[Link]],
trace_state: Optional[TraceState] = None,
) -> SamplingIntent:
return _intent

def get_description(self) -> str:
return "ConsistentAlwaysOnSampler"


_always_on = ConsistentSampler(ConsistentAlwaysOnSampler())


def consistent_always_on() -> ConsistentSampler:
"""Returns a consistent sampler that samples all spans."""
return _always_on
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from dataclasses import dataclass, field
from typing import Callable, Optional, Protocol, Sequence

from opentelemetry.context import Context
from opentelemetry.trace import Link, SpanKind, TraceState
from opentelemetry.util.types import Attributes


@dataclass(frozen=True)
class SamplingIntent:
"""Information to make a consistent sampling decision."""

threshold: int
adjusted_count_reliable: bool = field(default=True)
attributes: Attributes = field(default=None)
update_trace_state: Callable[[TraceState], TraceState] = field(
default=lambda ts: ts
)


class ComposableSampler(Protocol):
"""A sampler that can be composed to make a final consistent sampling decision."""

def sampling_intent(
self,
parent_ctx: Optional[Context],
name: str,
span_kind: Optional[SpanKind],
attributes: Attributes,
links: Optional[Sequence[Link]],
trace_state: Optional[TraceState],
) -> SamplingIntent:
"""Returns information to make a consistent sampling decision."""

def get_description(self) -> str:
"""Returns a description of the sampler."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import Optional, Sequence

from opentelemetry.context import Context
from opentelemetry.trace import Link, SpanKind, TraceState
from opentelemetry.util.types import Attributes

from ._composable import ComposableSampler, SamplingIntent
from ._sampler import ConsistentSampler
from ._trace_state import serialize_th
from ._util import INVALID_THRESHOLD, MAX_THRESHOLD, calculate_threshold


class ConsistentFixedThresholdSampler(ComposableSampler):
_threshold: int
_description: str

def __init__(self, sampling_probability: float):
threshold = calculate_threshold(sampling_probability)
if threshold == MAX_THRESHOLD:
threshold_str = "max"
else:
threshold_str = serialize_th(threshold)
threshold = (
INVALID_THRESHOLD if threshold == MAX_THRESHOLD else threshold
)
self._intent = SamplingIntent(threshold=threshold)
self._description = f"ConsistentFixedThresholdSampler{{threshold={threshold_str}, sampling probability={sampling_probability}}}"

def sampling_intent(
self,
parent_ctx: Optional[Context],
name: str,
span_kind: Optional[SpanKind],
attributes: Attributes,
links: Optional[Sequence[Link]],
trace_state: Optional[TraceState] = None,
) -> SamplingIntent:
return self._intent

def get_description(self) -> str:
return self._description


def consistent_probability_based(
sampling_probability: float,
) -> ConsistentSampler:
"""Returns a consistent sampler that samples each span with a fixed probability."""
if not 0.0 <= sampling_probability <= 1.0:
raise ValueError("Sampling probability must be between 0.0 and 1.0")

return ConsistentSampler(
ConsistentFixedThresholdSampler(sampling_probability)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from typing import Optional, Sequence

from opentelemetry.context import Context
from opentelemetry.trace import Link, SpanKind, TraceState, get_current_span
from opentelemetry.util.types import Attributes

from ._composable import ComposableSampler, SamplingIntent
from ._sampler import ConsistentSampler
from ._trace_state import OtelTraceState
from ._util import (
INVALID_THRESHOLD,
MIN_THRESHOLD,
is_valid_threshold,
)


class ConsistentParentBasedSampler(ComposableSampler):
def __init__(self, root_sampler: ComposableSampler):
self._root_sampler = root_sampler
self._description = f"ConsistentParentBasedSampler{{root_sampler={root_sampler.get_description()}}}"

def sampling_intent(
self,
parent_ctx: Optional[Context],
name: str,
span_kind: Optional[SpanKind],
attributes: Attributes,
links: Optional[Sequence[Link]],
trace_state: Optional[TraceState] = None,
) -> SamplingIntent:
parent_span = get_current_span(parent_ctx)
parent_span_ctx = parent_span.get_span_context()
is_root = not parent_span_ctx.is_valid
if is_root:
return self._root_sampler.sampling_intent(
parent_ctx, name, span_kind, attributes, links, trace_state
)

ot_trace_state = OtelTraceState.parse(trace_state)

if is_valid_threshold(ot_trace_state.threshold):
return SamplingIntent(
threshold=ot_trace_state.threshold,
adjusted_count_reliable=True,
)

threshold = (
MIN_THRESHOLD
if parent_span_ctx.trace_flags.sampled
else INVALID_THRESHOLD
)
return SamplingIntent(
threshold=threshold, adjusted_count_reliable=False
)

def get_description(self) -> str:
return self._description


def consistent_parent_based(
root_sampler: ComposableSampler,
) -> ConsistentSampler:
"""Returns a consistent sampler that respects the sampling decision of
the parent span or falls-back to the given sampler if it is a root span."""
return ConsistentSampler(ConsistentParentBasedSampler(root_sampler))
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from typing import Optional, Sequence

from opentelemetry.context import Context
from opentelemetry.sdk.trace.sampling import Decision, Sampler, SamplingResult
from opentelemetry.trace import Link, SpanKind, TraceState
from opentelemetry.util.types import Attributes

from ._composable import ComposableSampler, SamplingIntent
from ._trace_state import OTEL_TRACE_STATE_KEY, OtelTraceState
from ._util import INVALID_THRESHOLD, is_valid_random_value, is_valid_threshold


class ConsistentSampler(Sampler, ComposableSampler):
"""A sampler that uses a consistent sampling strategy based on a delegate sampler."""

def __init__(self, delegate: ComposableSampler):
self._delegate = delegate

def should_sample(
self,
parent_context: Optional[Context],
trace_id: int,
name: str,
kind: Optional[SpanKind] = None,
attributes: Attributes = None,
links: Optional[Sequence[Link]] = None,
trace_state: Optional[TraceState] = None,
) -> SamplingResult:
ot_trace_state = OtelTraceState.parse(trace_state)

intent = self._delegate.sampling_intent(
parent_context, name, kind, attributes, links, trace_state
)
threshold = intent.threshold

if is_valid_threshold(threshold):
adjusted_count_correct = intent.adjusted_count_reliable
if is_valid_random_value(ot_trace_state.random_value):
randomness = ot_trace_state.random_value
else:
# Use last 56 bits of trace_id as randomness
randomness = trace_id & 0x00FFFFFFFFFFFFFF
sampled = threshold <= randomness
else:
sampled = False
adjusted_count_correct = False

decision = Decision.RECORD_AND_SAMPLE if sampled else Decision.DROP
if sampled and adjusted_count_correct:
ot_trace_state.threshold = threshold
else:
ot_trace_state.threshold = INVALID_THRESHOLD

return SamplingResult(
decision,
intent.attributes,
_update_trace_state(trace_state, ot_trace_state, intent),
)

def sampling_intent(
self,
parent_ctx: Optional[Context],
name: str,
span_kind: Optional[SpanKind],
attributes: Attributes,
links: Optional[Sequence[Link]],
trace_state: Optional[TraceState],
) -> SamplingIntent:
return self._delegate.sampling_intent(
parent_ctx, name, span_kind, attributes, links, trace_state
)

def get_description(self) -> str:
return self._delegate.get_description()


def _update_trace_state(
trace_state: Optional[TraceState],
ot_trace_state: OtelTraceState,
intent: SamplingIntent,
) -> Optional[TraceState]:
otts = ot_trace_state.serialize()
if not trace_state:
if otts:
return TraceState(((OTEL_TRACE_STATE_KEY, otts),))
return None
new_trace_state = intent.update_trace_state(trace_state)
if otts:
return new_trace_state.update(OTEL_TRACE_STATE_KEY, otts)
return new_trace_state
Loading
Loading