From cf5fd9abbc0b379766f168d8dedda40bd53d2917 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Wed, 30 Oct 2024 10:35:22 -0700 Subject: [PATCH 1/3] Minor fixes in vanilla OTel tracing sample --- .../samples/sample_chat_completions_with_tracing.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/ai/azure-ai-inference/samples/sample_chat_completions_with_tracing.py b/sdk/ai/azure-ai-inference/samples/sample_chat_completions_with_tracing.py index 978580f0c41e..8e70f29289cb 100644 --- a/sdk/ai/azure-ai-inference/samples/sample_chat_completions_with_tracing.py +++ b/sdk/ai/azure-ai-inference/samples/sample_chat_completions_with_tracing.py @@ -23,8 +23,7 @@ where `your-deployment-name` is your unique AI Model deployment name, and `your-azure-region` is the Azure region where your model is deployed. 2) AZURE_AI_CHAT_KEY - Your model key (a 32-character string). Keep it secret. - 3) AZURE_TRACING_GEN_AI_CONTENT_RECORDING_ENABLED - Optional. Set to 'true' - for detailed traces, including chat request and response messages. + 3) AZURE_TRACING_GEN_AI_CONTENT_RECORDING_ENABLED - Set to 'true' to enable content recording. """ From 0155be0471f0d1aeb663cc656e93a06cdfbd43d6 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Thu, 19 Dec 2024 16:02:00 -0800 Subject: [PATCH 2/3] Provide distinct timestamps for content events for azmon --- .../azure/ai/inference/tracing.py | 99 ++++++++++++------- .../tests/gen_ai_trace_verifier.py | 7 ++ 2 files changed, 68 insertions(+), 38 deletions(-) diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/tracing.py b/sdk/ai/azure-ai-inference/azure/ai/inference/tracing.py index dc3f0ed982e4..eb2b144fbeea 100644 --- a/sdk/ai/azure-ai-inference/azure/ai/inference/tracing.py +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/tracing.py @@ -9,6 +9,7 @@ import importlib import logging import os +from time import time_ns from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union from urllib.parse import urlparse @@ -193,7 +194,8 @@ def _set_attributes(self, span: "AbstractSpan", *attrs: Tuple[str, Any]) -> None if value is not None: span.add_attribute(key, value) - def _add_request_chat_message_event(self, span: "AbstractSpan", **kwargs: Any) -> None: + def _add_request_chat_message_events(self, span: "AbstractSpan", **kwargs: Any) -> int: + timestamp = 0 for message in kwargs.get("messages", []): try: message = message.as_dict() @@ -201,15 +203,18 @@ def _add_request_chat_message_event(self, span: "AbstractSpan", **kwargs: Any) - pass if message.get("role"): - name = f"gen_ai.{message.get('role')}.message" - span.span_instance.add_event( - name=name, - attributes={ + timestamp = self._record_event( + span, + f"gen_ai.{message.get('role')}.message", + { "gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME, "gen_ai.event.content": json.dumps(message), }, + timestamp, ) + return timestamp + def _parse_url(self, url): parsed = urlparse(url) server_address = parsed.hostname @@ -280,8 +285,9 @@ def _get_finish_reason_for_choice(self, choice): return "none" - def _add_response_chat_message_event(self, span: "AbstractSpan", result: _models.ChatCompletions) -> None: + def _add_response_chat_message_events(self, span: "AbstractSpan", result: _models.ChatCompletions, last_event_timestamp_ns: int) -> None: for choice in result.choices: + attributes = {} if _trace_inference_content: full_response: Dict[str, Any] = { "message": {"content": choice.message.content}, @@ -312,7 +318,7 @@ def _add_response_chat_message_event(self, span: "AbstractSpan", result: _models "gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME, "gen_ai.event.content": json.dumps(response), } - span.span_instance.add_event(name="gen_ai.choice", attributes=attributes) + last_event_timestamp_ns = self._record_event(span, "gen_ai.choice", attributes, last_event_timestamp_ns) def _add_response_chat_attributes( self, @@ -336,15 +342,16 @@ def _add_response_chat_attributes( if not finish_reasons is None: span.add_attribute("gen_ai.response.finish_reasons", finish_reasons) # type: ignore - def _add_request_span_attributes(self, span: "AbstractSpan", _span_name: str, args: Any, kwargs: Any) -> None: + def _add_request_details(self, span: "AbstractSpan", args: Any, kwargs: Any) -> int: self._add_request_chat_attributes(span, *args, **kwargs) if _trace_inference_content: - self._add_request_chat_message_event(span, **kwargs) + return self._add_request_chat_message_events(span, **kwargs) + return 0 - def _add_response_span_attributes(self, span: "AbstractSpan", result: object) -> None: + def _add_response_details(self, span: "AbstractSpan", result: object, last_event_timestamp_ns: int) -> None: if isinstance(result, _models.ChatCompletions): self._add_response_chat_attributes(span, result) - self._add_response_chat_message_event(span, result) + self._add_response_chat_message_events(span, result, last_event_timestamp_ns) # TODO add more models here def _accumulate_response(self, item, accumulate: Dict[str, Any]) -> None: @@ -410,7 +417,7 @@ def _accumulate_async_streaming_response(self, item, accumulate: Dict[str, Any]) accumulate["message"]["tool_calls"][-1]["function"]["arguments"] += tool_call.function.arguments def _wrapped_stream( - self, stream_obj: _models.StreamingChatCompletions, span: "AbstractSpan" + self, stream_obj: _models.StreamingChatCompletions, span: "AbstractSpan", previous_event_timestamp: int ) -> _models.StreamingChatCompletions: class StreamWrapper(_models.StreamingChatCompletions): def __init__(self, stream_obj, instrumentor): @@ -467,29 +474,27 @@ def __iter__( # pyright: ignore [reportIncompatibleMethodOverride] accumulate["message"]["tool_calls"] = list( tool_calls_function_names_and_arguments_removed ) - - span.span_instance.add_event( - name="gen_ai.choice", - attributes={ - "gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME, - "gen_ai.event.content": json.dumps(accumulate), - }, - ) + attributes={ + "gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME, + "gen_ai.event.content": json.dumps(accumulate), + } + self._instrumentor._record_event(span, "gen_ai.choice", attributes, previous_event_timestamp) span.finish() return StreamWrapper(stream_obj, self) def _async_wrapped_stream( - self, stream_obj: _models.AsyncStreamingChatCompletions, span: "AbstractSpan" + self, stream_obj: _models.AsyncStreamingChatCompletions, span: "AbstractSpan", last_event_timestamp_ns: int ) -> _models.AsyncStreamingChatCompletions: class AsyncStreamWrapper(_models.AsyncStreamingChatCompletions): - def __init__(self, stream_obj, instrumentor, span): + def __init__(self, stream_obj, instrumentor, span, last_event_timestamp_ns): super().__init__(stream_obj._response) self._instrumentor = instrumentor self._accumulate: Dict[str, Any] = {} self._stream_obj = stream_obj self.span = span self._last_result = None + self._last_event_timestamp_ns = last_event_timestamp_ns async def __anext__(self) -> "_models.StreamingChatCompletionsUpdate": try: @@ -523,19 +528,37 @@ def _trace_stream_content(self) -> None: self._accumulate["message"]["tool_calls"] ) self._accumulate["message"]["tool_calls"] = list(tools_no_recording) - - self.span.span_instance.add_event( - name="gen_ai.choice", - attributes={ - "gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME, - "gen_ai.event.content": json.dumps(self._accumulate), - }, - ) + attributes={ + "gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME, + "gen_ai.event.content": json.dumps(self._accumulate), + } + self._previous_event_timestamp = self._instrumentor._record_event(span, "gen_ai.choice", attributes, self._last_event_timestamp_ns) span.finish() - async_stream_wrapper = AsyncStreamWrapper(stream_obj, self, span) + async_stream_wrapper = AsyncStreamWrapper(stream_obj, self, span, last_event_timestamp_ns) return async_stream_wrapper + def _record_event(self, span: "AbstractSpan", name: str, attributes: Dict[str, Any], last_event_timestamp_ns: int) -> int: + timestamp = time_ns() + + # we're recording multiple events, some of them are emitted within (hundreds of) nanoseconds of each other. + # time.time_ns resolution is not high enough on windows to guarantee unique timestamps for each message. + # Also Azure Monitor truncates resolution to microseconds and some other backends truncate to milliseconds. + # + # But we need to give users a way to restore event order, so we're incrementing the timestamp + # by 1 microsecond for each message. + # + # This is a workaround, we'll find a generic and better solution - see + # https://github.com/open-telemetry/semantic-conventions/issues/1701 + if last_event_timestamp_ns > 0 and timestamp <= (last_event_timestamp_ns + 1000): + timestamp = last_event_timestamp_ns + 1000 + + span.span_instance.add_event(name=name, + attributes=attributes, + timestamp=timestamp) + + return timestamp + def _trace_sync_function( self, function: Callable, @@ -580,16 +603,16 @@ def inner(*args, **kwargs): name=span_name, kind=SpanKind.CLIENT, # pyright: ignore [reportPossiblyUnboundVariable] ) + try: # tracing events not supported in azure-core-tracing-opentelemetry # so need to access the span instance directly with span_impl_type.change_context(span.span_instance): - self._add_request_span_attributes(span, span_name, args, kwargs) + last_event_timestamp_ns = self._add_request_details(span, args, kwargs) result = function(*args, **kwargs) if kwargs.get("stream") is True: - return self._wrapped_stream(result, span) - self._add_response_span_attributes(span, result) - + return self._wrapped_stream(result, span, last_event_timestamp_ns) + self._add_response_details(span, result, last_event_timestamp_ns) except Exception as exc: # Set the span status to error if isinstance(span.span_instance, Span): # pyright: ignore [reportPossiblyUnboundVariable] @@ -659,11 +682,11 @@ async def inner(*args, **kwargs): # tracing events not supported in azure-core-tracing-opentelemetry # so need to access the span instance directly with span_impl_type.change_context(span.span_instance): - self._add_request_span_attributes(span, span_name, args, kwargs) + last_event_timestamp_ns = self._add_request_details(span, args, kwargs) result = await function(*args, **kwargs) if kwargs.get("stream") is True: - return self._async_wrapped_stream(result, span) - self._add_response_span_attributes(span, result) + return self._async_wrapped_stream(result, span, last_event_timestamp_ns) + self._add_response_details(span, result, last_event_timestamp_ns) except Exception as exc: # Set the span status to error diff --git a/sdk/ai/azure-ai-inference/tests/gen_ai_trace_verifier.py b/sdk/ai/azure-ai-inference/tests/gen_ai_trace_verifier.py index a105b60cf8ac..c40241a369da 100644 --- a/sdk/ai/azure-ai-inference/tests/gen_ai_trace_verifier.py +++ b/sdk/ai/azure-ai-inference/tests/gen_ai_trace_verifier.py @@ -106,4 +106,11 @@ def check_span_events(self, span, expected_events): if len(span_events) > 0: # If there are any additional events in the span_events return False + prev_event = None + for actual_event in list(span.events): + if prev_event is not None and actual_event.timestamp <= prev_event.timestamp: + print(f"Event {actual_event.name} has a timestamp {actual_event.timestamp} that is not greater than the previous event's timestamp {prev_event.timestamp}, {prev_event.name}") + return False + prev_event = actual_event + return True From 762e2e7adb88717465637f2358c5d4e9508db109 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Thu, 19 Dec 2024 16:17:05 -0800 Subject: [PATCH 3/3] lint --- .../azure/ai/inference/tracing.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/tracing.py b/sdk/ai/azure-ai-inference/azure/ai/inference/tracing.py index eb2b144fbeea..62b18c1a34aa 100644 --- a/sdk/ai/azure-ai-inference/azure/ai/inference/tracing.py +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/tracing.py @@ -285,7 +285,9 @@ def _get_finish_reason_for_choice(self, choice): return "none" - def _add_response_chat_message_events(self, span: "AbstractSpan", result: _models.ChatCompletions, last_event_timestamp_ns: int) -> None: + def _add_response_chat_message_events(self, span: "AbstractSpan", + result: _models.ChatCompletions, last_event_timestamp_ns: int + ) -> None: for choice in result.choices: attributes = {} if _trace_inference_content: @@ -532,13 +534,20 @@ def _trace_stream_content(self) -> None: "gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME, "gen_ai.event.content": json.dumps(self._accumulate), } - self._previous_event_timestamp = self._instrumentor._record_event(span, "gen_ai.choice", attributes, self._last_event_timestamp_ns) + self._last_event_timestamp_ns = self._instrumentor._record_event( # pylint: disable=protected-access, line-too-long # pyright: ignore [reportFunctionMemberAccess] + span, + "gen_ai.choice", + attributes, + self._last_event_timestamp_ns + ) span.finish() async_stream_wrapper = AsyncStreamWrapper(stream_obj, self, span, last_event_timestamp_ns) return async_stream_wrapper - def _record_event(self, span: "AbstractSpan", name: str, attributes: Dict[str, Any], last_event_timestamp_ns: int) -> int: + def _record_event(self, span: "AbstractSpan", name: str, + attributes: Dict[str, Any], last_event_timestamp_ns: int + ) -> int: timestamp = time_ns() # we're recording multiple events, some of them are emitted within (hundreds of) nanoseconds of each other.