Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3567](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3567))
- `opentelemetry-resource-detector-containerid`: make it more quiet on platforms without cgroups
([#3579](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3579))
- `opentelemetry-instrumentation-botocore`: migrate off the deprecated events API to use the logs API
([#3624](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3624))

### Added

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def response_hook(span, service_name, operation_name, result):
from botocore.exceptions import ClientError
from wrapt import wrap_function_wrapper

from opentelemetry._events import get_event_logger
from opentelemetry._logs import get_logger
from opentelemetry.instrumentation.botocore.extensions import (
_find_extension,
_has_extension,
Expand Down Expand Up @@ -139,8 +139,8 @@ def _instrument(self, **kwargs):

# tracers are lazy initialized per-extension in _get_tracer
self._tracers = {}
# event_loggers are lazy initialized per-extension in _get_event_logger
self._event_loggers = {}
# loggers are lazy initialized per-extension in _get_logger
self._loggers = {}
# meters are lazy initialized per-extension in _get_meter
self._meters = {}
# metrics are lazy initialized per-extension in _get_metrics
Expand All @@ -154,7 +154,7 @@ def _instrument(self, **kwargs):
self.propagator = propagator

self.tracer_provider = kwargs.get("tracer_provider")
self.event_logger_provider = kwargs.get("event_logger_provider")
self.logger_provider = kwargs.get("logger_provider")
self.meter_provider = kwargs.get("meter_provider")

wrap_function_wrapper(
Expand Down Expand Up @@ -195,23 +195,22 @@ def _get_tracer(self, extension: _AwsSdkExtension):
)
return self._tracers[instrumentation_name]

def _get_event_logger(self, extension: _AwsSdkExtension):
"""This is a multiplexer in order to have an event logger per extension"""
def _get_logger(self, extension: _AwsSdkExtension):
"""This is a multiplexer in order to have a logger per extension"""

instrumentation_name = self._get_instrumentation_name(extension)
event_logger = self._event_loggers.get(instrumentation_name)
if event_logger:
return event_logger
if self._loggers.get(instrumentation_name):
return self._loggers.get(instrumentation_name)

schema_version = extension.event_logger_schema_version()
self._event_loggers[instrumentation_name] = get_event_logger(
self._loggers[instrumentation_name] = get_logger(
instrumentation_name,
"",
schema_url=f"https://opentelemetry.io/schemas/{schema_version}",
event_logger_provider=self.event_logger_provider,
logger_provider=self.logger_provider,
)

return self._event_loggers[instrumentation_name]
return self._loggers[instrumentation_name]

def _get_meter(self, extension: _AwsSdkExtension):
"""This is a multiplexer in order to have a meter per extension"""
Expand Down Expand Up @@ -287,11 +286,10 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
end_span_on_exit = extension.should_end_span_on_exit()

tracer = self._get_tracer(extension)
event_logger = self._get_event_logger(extension)
meter = self._get_meter(extension)
metrics = self._get_metrics(extension, meter)
instrumentor_ctx = _BotocoreInstrumentorContext(
event_logger=event_logger,
logger=self._get_logger(extension),
metrics=metrics,
)
with tracer.start_as_current_span(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,9 @@ def before_service_call(

messages = self._get_request_messages()
for message in messages:
event_logger = instrumentor_context.event_logger
logger = instrumentor_context.logger
for event in message_to_event(message, capture_content):
event_logger.emit(event)
logger.emit(event)

if span.is_recording():
operation_name = span.attributes.get(GEN_AI_OPERATION_NAME, "")
Expand Down Expand Up @@ -501,12 +501,12 @@ def _converse_on_success(

# In case of an early stream closure, the result may not contain outputs
if self._stream_has_output_content(result):
event_logger = instrumentor_context.event_logger
logger = instrumentor_context.logger
choice = _Choice.from_converse(result, capture_content)
# this path is used by streaming apis, in that case we are already out of the span
# context so need to add the span context manually
span_ctx = span.get_span_context()
event_logger.emit(
logger.emit(
choice.to_choice_event(
trace_id=span_ctx.trace_id,
span_id=span_ctx.span_id,
Expand Down Expand Up @@ -729,11 +729,11 @@ def _handle_amazon_titan_response(
[result["completionReason"]],
)

event_logger = instrumentor_context.event_logger
logger = instrumentor_context.logger
choice = _Choice.from_invoke_amazon_titan(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())
logger.emit(choice.to_choice_event())

metrics = instrumentor_context.metrics
metrics_attributes = self._extract_metrics_attributes()
Expand Down Expand Up @@ -791,9 +791,9 @@ def _handle_amazon_nova_response(

# In case of an early stream closure, the result may not contain outputs
if self._stream_has_output_content(response_body):
event_logger = instrumentor_context.event_logger
logger = instrumentor_context.logger
choice = _Choice.from_converse(response_body, capture_content)
event_logger.emit(choice.to_choice_event())
logger.emit(choice.to_choice_event())

metrics = instrumentor_context.metrics
metrics_attributes = self._extract_metrics_attributes()
Expand Down Expand Up @@ -848,11 +848,11 @@ def _handle_anthropic_claude_response(
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]
)

event_logger = instrumentor_context.event_logger
logger = instrumentor_context.logger
choice = _Choice.from_invoke_anthropic_claude(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())
logger.emit(choice.to_choice_event())

metrics = instrumentor_context.metrics
metrics_attributes = self._extract_metrics_attributes()
Expand Down Expand Up @@ -903,11 +903,11 @@ def _handle_cohere_command_r_response(
[response_body["finish_reason"]],
)

event_logger = instrumentor_context.event_logger
logger = instrumentor_context.logger
choice = _Choice.from_invoke_cohere_command_r(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())
logger.emit(choice.to_choice_event())

def _handle_cohere_command_response(
self,
Expand All @@ -929,11 +929,11 @@ def _handle_cohere_command_response(
[generations["finish_reason"]],
)

event_logger = instrumentor_context.event_logger
logger = instrumentor_context.logger
choice = _Choice.from_invoke_cohere_command(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())
logger.emit(choice.to_choice_event())

def _handle_meta_llama_response(
self,
Expand All @@ -956,9 +956,9 @@ def _handle_meta_llama_response(
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]
)

event_logger = instrumentor_context.event_logger
logger = instrumentor_context.logger
choice = _Choice.from_invoke_meta_llama(response_body, capture_content)
event_logger.emit(choice.to_choice_event())
logger.emit(choice.to_choice_event())

def _handle_mistral_ai_response(
self,
Expand All @@ -979,11 +979,11 @@ def _handle_mistral_ai_response(
GEN_AI_RESPONSE_FINISH_REASONS, [outputs["stop_reason"]]
)

event_logger = instrumentor_context.event_logger
logger = instrumentor_context.logger
choice = _Choice.from_invoke_mistral_mistral(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())
logger.emit(choice.to_choice_event())

def on_error(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from botocore.eventstream import EventStream, EventStreamError
from wrapt import ObjectProxy

from opentelemetry._events import Event
from opentelemetry._logs import LogRecord
from opentelemetry.context import get_current
from opentelemetry.instrumentation.botocore.environment_variables import (
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT,
)
Expand Down Expand Up @@ -492,7 +493,7 @@ def extract_tool_results(

def message_to_event(
message: dict[str, Any], capture_content: bool
) -> Iterator[Event]:
) -> Iterator[LogRecord]:
attributes = {GEN_AI_SYSTEM: GenAiSystemValues.AWS_BEDROCK.value}
role = message.get("role")
content = message.get("content")
Expand All @@ -507,16 +508,18 @@ def message_to_event(
elif role == "user":
# in case of tool calls we send one tool event for tool call and one for the user event
for tool_body in extract_tool_results(message, capture_content):
yield Event(
name="gen_ai.tool.message",
yield LogRecord(
event_name="gen_ai.tool.message",
attributes=attributes,
body=tool_body,
context=get_current(),
)

yield Event(
name=f"gen_ai.{role}.message",
yield LogRecord(
event_name=f"gen_ai.{role}.message",
attributes=attributes,
body=body if body else None,
context=get_current(),
)


Expand Down Expand Up @@ -617,11 +620,12 @@ def _to_body_dict(self) -> dict[str, Any]:
"message": self.message,
}

def to_choice_event(self, **event_kwargs) -> Event:
def to_choice_event(self, **event_kwargs) -> LogRecord:
attributes = {GEN_AI_SYSTEM: GenAiSystemValues.AWS_BEDROCK.value}
return Event(
name="gen_ai.choice",
return LogRecord(
event_name="gen_ai.choice",
attributes=attributes,
body=self._to_body_dict(),
**event_kwargs,
context=get_current(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import logging
from typing import Any, Dict, Optional, Tuple

from opentelemetry._events import EventLogger
from opentelemetry._logs import Logger
from opentelemetry.metrics import Instrument, Meter
from opentelemetry.trace import SpanKind
from opentelemetry.trace.span import Span
Expand Down Expand Up @@ -96,10 +96,10 @@ def _get_attr(obj, name: str, default=None):
class _BotocoreInstrumentorContext:
def __init__(
self,
event_logger: EventLogger,
logger: Logger,
metrics: Dict[str, Instrument] | None = None,
):
self.event_logger = event_logger
self.logger = logger
self.metrics = metrics or {}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
)
from opentelemetry.sdk.metrics._internal.point import ResourceMetrics
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.semconv._incubating.attributes import (
event_attributes as EventAttributes,
)
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)
Expand Down Expand Up @@ -282,17 +279,17 @@ def remove_none_values(body):

def assert_log_parent(log, span):
if span:
assert log.log_record.trace_id == span.get_span_context().trace_id
assert (
log.log_record.trace_id == span.get_span_context().trace_id
), f"{span.get_span_context().trace_id} does not equal {log.log_record.trace_id}"
assert log.log_record.span_id == span.get_span_context().span_id
assert (
log.log_record.trace_flags == span.get_span_context().trace_flags
)


def assert_message_in_logs(log, event_name, expected_content, parent_span):
assert (
log.log_record.attributes[EventAttributes.EVENT_NAME] == event_name
), log.log_record.attributes[EventAttributes.EVENT_NAME]
assert log.log_record.event_name == event_name, log.log_record.event_name
assert (
log.log_record.attributes[GenAIAttributes.GEN_AI_SYSTEM]
== GenAIAttributes.GenAiSystemValues.AWS_BEDROCK.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from opentelemetry.instrumentation.botocore.environment_variables import (
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT,
)
from opentelemetry.sdk._events import EventLoggerProvider
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import (
InMemoryLogExporter,
Expand Down Expand Up @@ -55,13 +54,11 @@ def fixture_tracer_provider(span_exporter):
return provider


@pytest.fixture(scope="function", name="event_logger_provider")
def fixture_event_logger_provider(log_exporter):
@pytest.fixture(scope="function", name="logger_provider")
def fixture_logger_provider(log_exporter):
provider = LoggerProvider()
provider.add_log_record_processor(SimpleLogRecordProcessor(log_exporter))
event_logger_provider = EventLoggerProvider(provider)

return event_logger_provider
return provider


@pytest.fixture(scope="function", name="meter_provider")
Expand Down Expand Up @@ -102,17 +99,15 @@ def vcr_config():


@pytest.fixture(scope="function")
def instrument_no_content(
tracer_provider, event_logger_provider, meter_provider
):
def instrument_no_content(tracer_provider, logger_provider, meter_provider):
os.environ.update(
{OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: "False"}
)

instrumentor = BotocoreInstrumentor()
instrumentor.instrument(
tracer_provider=tracer_provider,
event_logger_provider=event_logger_provider,
logger_provider=logger_provider,
meter_provider=meter_provider,
)

Expand All @@ -122,16 +117,14 @@ def instrument_no_content(


@pytest.fixture(scope="function")
def instrument_with_content(
tracer_provider, event_logger_provider, meter_provider
):
def instrument_with_content(tracer_provider, logger_provider, meter_provider):
os.environ.update(
{OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: "True"}
)
instrumentor = BotocoreInstrumentor()
instrumentor.instrument(
tracer_provider=tracer_provider,
event_logger_provider=event_logger_provider,
logger_provider=logger_provider,
meter_provider=meter_provider,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2943,6 +2943,7 @@ def test_invoke_model_with_response_stream_handles_stream_error(
logs = log_exporter.get_finished_logs()
assert len(logs) == 1
user_content = {"content": [{"text": "Say this is a test"}]}
print(logs[0].log_record)
assert_message_in_logs(logs[0], "gen_ai.user.message", user_content, span)


Expand Down
Loading