diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/examples/manual/main.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/examples/manual/main.py index 4b0c121b7a..83b8607949 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/examples/manual/main.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/examples/manual/main.py @@ -1,8 +1,9 @@ # pylint: skip-file +import json import os +import uuid from openai import OpenAI - # NOTE: OpenTelemetry Python Logs and Events APIs are in beta from opentelemetry import _events, _logs, trace from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( @@ -13,11 +14,14 @@ ) from opentelemetry.instrumentation.openai_v2 import OpenAIInstrumentor from opentelemetry.sdk._events import EventLoggerProvider -from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs import LoggerProvider, LogRecordProcessor, LogData from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor +from dotenv import load_dotenv +load_dotenv() + # configure tracing trace.set_tracer_provider(TracerProvider()) trace.get_tracer_provider().add_span_processor( @@ -31,21 +35,139 @@ ) _events.set_event_logger_provider(EventLoggerProvider()) -# instrument OpenAI -OpenAIInstrumentor().instrument() +def prompt_uploader(span, event, prompt): + attribute_name = "gen_ai.request.inputs_ref" + ref = f"https://my-storage/bucket/{uuid.uuid4()}" + print(f"Uploading prompt to {ref}, prompt: {prompt}") + + if span.is_recording(): + span.set_attribute(attribute_name, ref) + span.set_attribute("gen_ai.request.inputs", "") + + if event: + event.attributes[attribute_name] = ref + event.body = f"prompt uploaded to {ref}" + +def completion_uploader(span, event, completion): + attribute_name = "gen_ai.response.outputs_ref" + ref = f"https://my-storage/bucket/{uuid.uuid4()}" + print(f"Uploading completion to {ref}, completion: {completion}") + + if span.is_recording(): + span.set_attribute(attribute_name, ref) + span.set_attribute("gen_ai.response.outputs", "") + + if event: + event.attributes[attribute_name] = ref + event.body = f"completion uploaded to {ref}" + +OpenAIInstrumentor().instrument( + capture_sensitive_content=True, # the same as existing OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT env var + capture_verbose_attributes=True, + + # | sensitive_content | verbose_attributes | Result | + # |-------------------|--------------------|------------------------------------------------------------------------------------------------------------------------------- | + # | False (default) | False (default) | Prompts/completions - not captured. Not-sensitive opt-in attributes - not captured | + # | False (default) | True | Prompts/completions - not captured. Not-sensitive opt-in attributes - captured | + # | True | False (default) | Prompts/completions - captured on events if DEBUG level is enabled. Not-sensitive opt-in attributes - not captured | + # | True | True | Prompts/completions - captured on attributes and events if DEBUG level is enabled. Not-sensitive opt-in attributes - captured | + # can probably merge two flags in one enum + + # optional hooks, independent from above flags: + # prompt_hook=prompt_uploader, + # completion_hook=completion_uploader +) + +weather_tool = { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get current weather for a given location", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "City or location name"} + }, + "required": ["location"] + } + } +} + +location_tool = { + "type": "function", + "function": { + "name": "get_location", + "description": "Get location information", + "parameters": { + } + } +} + +response_format = { + "type": "json_schema", + "json_schema": { + "name":"weather_forecast", + "schema" : { + "type": "object", + "properties": { + "temperature": { + "type": "string", + }, + "precipitation": { + "type": "string", + }, + }, + "required": ["temperature"], + "additionalProperties": True, + } + } +} +tracer = trace.get_tracer(__name__) +@tracer.start_as_current_span("main") def main(): client = OpenAI() - chat_completion = client.chat.completions.create( - model=os.getenv("CHAT_MODEL", "gpt-4o-mini"), - messages=[ + model=os.getenv("CHAT_MODEL", "gpt-4o-mini") + + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, { "role": "user", - "content": "Write a short poem on OpenTelemetry.", + "content": "What is the weather like?", }, - ], + ] + + chat_completion = client.chat.completions.create( + model=model, + messages=messages, + tools=[location_tool, weather_tool], + response_format=response_format ) + + while chat_completion.choices[0].finish_reason == "tool_calls": + messages.append(chat_completion.choices[0].message) + # Call the tool with the response from the model + for call in chat_completion.choices[0].message.tool_calls: + function_args = json.loads(call.function.arguments) + if call.function.name == "get_weather": + messages.append( + {"tool_call_id": call.id, + "role": "tool", + "name": "get_weather", + "content": f"Weather in {function_args['location']} is sunny and 75 degrees."}) + if call.function.name == "get_location": + messages.append( + {"tool_call_id": call.id, + "role": "tool", + "name": "get_location", + "content": "Seattle, WA"}) + chat_completion = client.chat.completions.create( + model=model, + messages=messages, + tools=[location_tool, weather_tool], + response_format=response_format) + print(chat_completion.choices[0].message.content) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py index ab4b6f9d7b..71917b5a59 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py @@ -40,11 +40,12 @@ --- """ +import json from typing import Collection from wrapt import wrap_function_wrapper -from opentelemetry._events import get_event_logger +from opentelemetry._events import get_event_logger, Event from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.openai_v2.package import _instruments from opentelemetry.instrumentation.openai_v2.utils import is_content_enabled @@ -52,6 +53,7 @@ from opentelemetry.metrics import get_meter from opentelemetry.semconv.schemas import Schemas from opentelemetry.trace import get_tracer +from opentelemetry._logs.severity import SeverityNumber from .instruments import Instruments from .patch import async_chat_completions_create, chat_completions_create @@ -64,6 +66,69 @@ def __init__(self): def instrumentation_dependencies(self) -> Collection[str]: return _instruments + def record_prompt_content(self, span, prompt_body): + """Default prompt hook.""" + + prompt_event = None + + if self.content_enabled: + # TODO: perf - check if DEBUG is enabled + prompt_event = Event( + "gen_ai.request.inputs", + body=prompt_body, # note, semconvs are switching to event attributes instead of body, so this is temporary + attributes={ + "gen_ai.system": "openai", + }, + severity_number=SeverityNumber.DEBUG, + ) + + if self.capture_verbose_attributes and span.is_recording(): + span.set_attribute("gen_ai.request.inputs", json.dumps(prompt_body, ensure_ascii=False)) + + if self.custom_prompt_hook: + try: + self.custom_prompt_hook(span, prompt_event, prompt_body) + except Exception as e: + # TODO - proper internal logging + print(f"Error in prompt hook, turning it off: {e}") + self.custom_prompt_hook = None + pass + + # prompt hook can modify the event, so we need to emit it after the hook is called + if prompt_event: + self.event_logger.emit(prompt_event) + + def record_completion_content(self, span, completion_body): + """Default completion hook.""" + + completion_event = None + if self.content_enabled: + # TODO: perf - check if DEBUG is enabled + completion_event = Event( + "gen_ai.response.outputs", + body=completion_body, # note, semconvs are switching to event attributes instead of body, so this is temporary + attributes={ + "gen_ai.system": "openai", + }, + severity_number=SeverityNumber.DEBUG, + ) + + if self.capture_verbose_attributes and span.is_recording(): + span.set_attribute("gen_ai.response.outputs", json.dumps(completion_body, ensure_ascii=False)) + + if self.custom_completion_hook: + try: + self.custom_completion_hook(span, completion_event, completion_body) + except Exception as e: + # TODO - proper internal logging + print(f"Error in completion hook, turning it off: {e}") + self.custom_completion_hook = None + pass + + # completion hook can modify the event, so we need to emit it after the hook is called + if completion_event: + self.event_logger.emit(completion_event) + def _instrument(self, **kwargs): """Enable OpenAI instrumentation.""" tracer_provider = kwargs.get("tracer_provider") @@ -74,7 +139,7 @@ def _instrument(self, **kwargs): schema_url=Schemas.V1_28_0.value, ) event_logger_provider = kwargs.get("event_logger_provider") - event_logger = get_event_logger( + self.event_logger = get_event_logger( __name__, "", schema_url=Schemas.V1_28_0.value, @@ -90,11 +155,25 @@ def _instrument(self, **kwargs): instruments = Instruments(self._meter) + self.capture_verbose_attributes = kwargs.get("capture_verbose_attributes", True) + + self.content_enabled = is_content_enabled() or kwargs.get("capture_sensitive_content", False) + self.custom_prompt_hook = kwargs.get("prompt_hook") + self.custom_completion_hook = kwargs.get("completion_hook") + + prompt_hook = None + if self.content_enabled or self.custom_prompt_hook: + prompt_hook = self.record_prompt_content + + completion_hook = None + if self.content_enabled or self.custom_completion_hook: + completion_hook = self.record_completion_content + wrap_function_wrapper( module="openai.resources.chat.completions", name="Completions.create", wrapper=chat_completions_create( - tracer, event_logger, instruments, is_content_enabled() + tracer, self.event_logger, instruments, prompt_hook, completion_hook, self.capture_verbose_attributes ), ) @@ -102,7 +181,7 @@ def _instrument(self, **kwargs): module="openai.resources.chat.completions", name="AsyncCompletions.create", wrapper=async_chat_completions_create( - tracer, event_logger, instruments, is_content_enabled() + tracer, self.event_logger, instruments, prompt_hook, completion_hook, self.capture_verbose_attributes ), ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py index 072365abb7..eb7eaab226 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py @@ -13,12 +13,14 @@ # limitations under the License. +import json from timeit import default_timer from typing import Optional from openai import Stream from opentelemetry._events import Event, EventLogger +from opentelemetry._logs import SeverityNumber from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAIAttributes, ) @@ -29,11 +31,11 @@ from .instruments import Instruments from .utils import ( - choice_to_event, + completion_to_object, get_llm_request_attributes, handle_span_exception, is_streaming, - message_to_event, + prompt_to_object, set_span_attribute, ) @@ -42,7 +44,9 @@ def chat_completions_create( tracer: Tracer, event_logger: EventLogger, instruments: Instruments, - capture_content: bool, + prompt_hook, + completion_hook, + capture_verbose_attributes: bool, ): """Wrap the `create` method of the `ChatCompletion` class to trace it.""" @@ -56,25 +60,30 @@ def traced_method(wrapped, instance, args, kwargs): attributes=span_attributes, end_on_exit=False, ) as span: - for message in kwargs.get("messages", []): - event_logger.emit(message_to_event(message, capture_content)) start = default_timer() + + if prompt_hook: + prompt_hook(span, prompt_to_object(kwargs.get("messages", []))) + + _maybe_capture_verbose_attributes(capture_verbose_attributes, span, kwargs) + result = None error_type = None try: result = wrapped(*args, **kwargs) if is_streaming(kwargs): return StreamWrapper( - result, span, event_logger, capture_content + result, span ) if span.is_recording(): _set_response_attributes( - span, result, event_logger, capture_content + span, result ) - for choice in getattr(result, "choices", []): - event_logger.emit(choice_to_event(choice, capture_content)) + + if completion_hook: + completion_hook(span, completion_to_object(getattr(result, "choices", []))) span.end() return result @@ -100,7 +109,9 @@ def async_chat_completions_create( tracer: Tracer, event_logger: EventLogger, instruments: Instruments, - capture_content: bool, + prompt_hook, + completion_hook, + capture_verbose_attributes: bool, ): """Wrap the `create` method of the `AsyncChatCompletion` class to trace it.""" @@ -114,25 +125,29 @@ async def traced_method(wrapped, instance, args, kwargs): attributes=span_attributes, end_on_exit=False, ) as span: - for message in kwargs.get("messages", []): - event_logger.emit(message_to_event(message, capture_content)) - start = default_timer() + + if prompt_hook: + prompt_hook(span, prompt_to_object(kwargs.get("messages", []))) + + _maybe_capture_verbose_attributes(capture_verbose_attributes, span, kwargs) + result = None error_type = None try: result = await wrapped(*args, **kwargs) if is_streaming(kwargs): return StreamWrapper( - result, span, event_logger, capture_content + result, span, event_logger, completion_hook ) if span.is_recording(): _set_response_attributes( - span, result, event_logger, capture_content + span, result ) - for choice in getattr(result, "choices", []): - event_logger.emit(choice_to_event(choice, capture_content)) + + if completion_hook: + completion_hook(span, completion_to_object(getattr(result, "choices", []))) span.end() return result @@ -153,6 +168,32 @@ async def traced_method(wrapped, instance, args, kwargs): return traced_method +def _maybe_capture_verbose_attributes( + capture_verbose_attributes: bool, + span: Span, + kwargs: dict): + + if capture_verbose_attributes and span.is_recording(): + + response_format = kwargs.get("response_format") + if response_format: + response_type = response_format.get("type") + if response_type == "json_schema": + json_schema = response_format.get("json_schema") + if json_schema: + set_span_attribute( + span, + "openai.response_format.json_schema", + json.dumps(json_schema, ensure_ascii=False), + ) + + tools = kwargs.get("tools") + if tools: + set_span_attribute( + span, + "gen_ai.tools.definitions", + json.dumps(tools, ensure_ascii=False), + ) def _record_metrics( instruments: Instruments, @@ -220,9 +261,7 @@ def _record_metrics( ) -def _set_response_attributes( - span, result, event_logger: EventLogger, capture_content: bool -): +def _set_response_attributes(span, result): set_span_attribute( span, GenAIAttributes.GEN_AI_RESPONSE_MODEL, result.model ) @@ -312,13 +351,13 @@ def __init__( stream: Stream, span: Span, event_logger: EventLogger, - capture_content: bool, + completion_hook, ): self.stream = stream self.span = span self.choice_buffers = [] self._span_started = False - self.capture_content = capture_content + self.completion_hook = completion_hook self.event_logger = event_logger self.setup() @@ -392,13 +431,13 @@ def cleanup(self): "finish_reason": choice.finish_reason or "error", "message": message, } - event_attributes = { GenAIAttributes.GEN_AI_SYSTEM: GenAIAttributes.GenAiSystemValues.OPENAI.value } # this span is not current, so we need to manually set the context on event span_ctx = self.span.get_span_context() + self.event_logger.emit( Event( name="gen_ai.choice", diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py index f8a837259e..98abc0d273 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py @@ -19,7 +19,6 @@ from httpx import URL from openai import NOT_GIVEN -from opentelemetry._events import Event from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAIAttributes, ) @@ -44,7 +43,7 @@ def is_content_enabled() -> bool: return capture_content.lower() == "true" -def extract_tool_calls(item, capture_content): +def extract_tool_calls(item): tool_calls = get_property_value(item, "tool_calls") if tool_calls is None: return None @@ -69,7 +68,7 @@ def extract_tool_calls(item, capture_content): tool_call_dict["function"]["name"] = name arguments = get_property_value(func, "arguments") - if capture_content and arguments: + if arguments: if isinstance(arguments, str): arguments = arguments.replace("\n", "") tool_call_dict["function"]["arguments"] = arguments @@ -103,64 +102,57 @@ def get_property_value(obj, property_name): return getattr(obj, property_name, None) - -def message_to_event(message, capture_content): - attributes = { - GenAIAttributes.GEN_AI_SYSTEM: GenAIAttributes.GenAiSystemValues.OPENAI.value - } - role = get_property_value(message, "role") - content = get_property_value(message, "content") - - body = {} - if capture_content and content: - body["content"] = content - if role == "assistant": - tool_calls = extract_tool_calls(message, capture_content) - if tool_calls: - body = {"tool_calls": tool_calls} - elif role == "tool": - tool_call_id = get_property_value(message, "tool_call_id") - if tool_call_id: - body["id"] = tool_call_id - - return Event( - name=f"gen_ai.{role}.message", - attributes=attributes, - body=body if body else None, - ) - - -def choice_to_event(choice, capture_content): - attributes = { - GenAIAttributes.GEN_AI_SYSTEM: GenAIAttributes.GenAiSystemValues.OPENAI.value - } - - body = { - "index": choice.index, - "finish_reason": choice.finish_reason or "error", - } - - if choice.message: - message = { - "role": ( - choice.message.role - if choice.message and choice.message.role - else None - ) +# TODO: expose common (across GenAI) public API for prompt payload +def prompt_to_object(messages): + body = [] + for message in messages: + role = get_property_value(message, "role") + content = get_property_value(message, "content") + + message_event = {} + if role: + message_event["role"] = role + if content: + message_event["content"] = content + if role == "assistant": + tool_calls = extract_tool_calls(message) + if tool_calls: + message_event = {"tool_calls": tool_calls} + elif role == "tool": + tool_call_id = get_property_value(message, "tool_call_id") + if tool_call_id: + message_event["id"] = tool_call_id + + body.append(message_event) + return body + +# TODO: expose common (across GenAI) public API for completion payload +def completion_to_object(choices): + body = [] + for choice in choices: + choice_event = { + "index": choice.index, + "finish_reason": choice.finish_reason or "error", } - tool_calls = extract_tool_calls(choice.message, capture_content) - if tool_calls: - message["tool_calls"] = tool_calls - content = get_property_value(choice.message, "content") - if capture_content and content: - message["content"] = content - body["message"] = message - - return Event( - name="gen_ai.choice", - attributes=attributes, - body=body, - ) + + if choice.message: + message = { + "role": ( + choice.message.role + if choice.message and choice.message.role + else None + ) + } + tool_calls = extract_tool_calls(choice.message) + if tool_calls: + message["tool_calls"] = tool_calls + content = get_property_value(choice.message, "content") + if content: + message["content"] = content + choice_event["message"] = message + body.append(choice_event) + + return body def set_span_attributes(span, attributes: dict):