Skip to content

Commit 50f348f

Browse files
lmolkoval0lawrence
authored andcommitted
Azure AI inference events timestamp in AzMon workaround (Azure#38955)
1 parent 4978228 commit 50f348f

File tree

3 files changed

+78
-40
lines changed

3 files changed

+78
-40
lines changed

sdk/ai/azure-ai-inference/azure/ai/inference/tracing.py

Lines changed: 70 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import importlib
1010
import logging
1111
import os
12+
from time import time_ns
1213
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
1314
from urllib.parse import urlparse
1415

@@ -193,23 +194,27 @@ def _set_attributes(self, span: "AbstractSpan", *attrs: Tuple[str, Any]) -> None
193194
if value is not None:
194195
span.add_attribute(key, value)
195196

196-
def _add_request_chat_message_event(self, span: "AbstractSpan", **kwargs: Any) -> None:
197+
def _add_request_chat_message_events(self, span: "AbstractSpan", **kwargs: Any) -> int:
198+
timestamp = 0
197199
for message in kwargs.get("messages", []):
198200
try:
199201
message = message.as_dict()
200202
except AttributeError:
201203
pass
202204

203205
if message.get("role"):
204-
name = f"gen_ai.{message.get('role')}.message"
205-
span.span_instance.add_event(
206-
name=name,
207-
attributes={
206+
timestamp = self._record_event(
207+
span,
208+
f"gen_ai.{message.get('role')}.message",
209+
{
208210
"gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME,
209211
"gen_ai.event.content": json.dumps(message),
210212
},
213+
timestamp,
211214
)
212215

216+
return timestamp
217+
213218
def _parse_url(self, url):
214219
parsed = urlparse(url)
215220
server_address = parsed.hostname
@@ -280,8 +285,11 @@ def _get_finish_reason_for_choice(self, choice):
280285

281286
return "none"
282287

283-
def _add_response_chat_message_event(self, span: "AbstractSpan", result: _models.ChatCompletions) -> None:
288+
def _add_response_chat_message_events(self, span: "AbstractSpan",
289+
result: _models.ChatCompletions, last_event_timestamp_ns: int
290+
) -> None:
284291
for choice in result.choices:
292+
attributes = {}
285293
if _trace_inference_content:
286294
full_response: Dict[str, Any] = {
287295
"message": {"content": choice.message.content},
@@ -312,7 +320,7 @@ def _add_response_chat_message_event(self, span: "AbstractSpan", result: _models
312320
"gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME,
313321
"gen_ai.event.content": json.dumps(response),
314322
}
315-
span.span_instance.add_event(name="gen_ai.choice", attributes=attributes)
323+
last_event_timestamp_ns = self._record_event(span, "gen_ai.choice", attributes, last_event_timestamp_ns)
316324

317325
def _add_response_chat_attributes(
318326
self,
@@ -336,15 +344,16 @@ def _add_response_chat_attributes(
336344
if not finish_reasons is None:
337345
span.add_attribute("gen_ai.response.finish_reasons", finish_reasons) # type: ignore
338346

339-
def _add_request_span_attributes(self, span: "AbstractSpan", _span_name: str, args: Any, kwargs: Any) -> None:
347+
def _add_request_details(self, span: "AbstractSpan", args: Any, kwargs: Any) -> int:
340348
self._add_request_chat_attributes(span, *args, **kwargs)
341349
if _trace_inference_content:
342-
self._add_request_chat_message_event(span, **kwargs)
350+
return self._add_request_chat_message_events(span, **kwargs)
351+
return 0
343352

344-
def _add_response_span_attributes(self, span: "AbstractSpan", result: object) -> None:
353+
def _add_response_details(self, span: "AbstractSpan", result: object, last_event_timestamp_ns: int) -> None:
345354
if isinstance(result, _models.ChatCompletions):
346355
self._add_response_chat_attributes(span, result)
347-
self._add_response_chat_message_event(span, result)
356+
self._add_response_chat_message_events(span, result, last_event_timestamp_ns)
348357
# TODO add more models here
349358

350359
def _accumulate_response(self, item, accumulate: Dict[str, Any]) -> None:
@@ -410,7 +419,7 @@ def _accumulate_async_streaming_response(self, item, accumulate: Dict[str, Any])
410419
accumulate["message"]["tool_calls"][-1]["function"]["arguments"] += tool_call.function.arguments
411420

412421
def _wrapped_stream(
413-
self, stream_obj: _models.StreamingChatCompletions, span: "AbstractSpan"
422+
self, stream_obj: _models.StreamingChatCompletions, span: "AbstractSpan", previous_event_timestamp: int
414423
) -> _models.StreamingChatCompletions:
415424
class StreamWrapper(_models.StreamingChatCompletions):
416425
def __init__(self, stream_obj, instrumentor):
@@ -467,29 +476,27 @@ def __iter__( # pyright: ignore [reportIncompatibleMethodOverride]
467476
accumulate["message"]["tool_calls"] = list(
468477
tool_calls_function_names_and_arguments_removed
469478
)
470-
471-
span.span_instance.add_event(
472-
name="gen_ai.choice",
473-
attributes={
474-
"gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME,
475-
"gen_ai.event.content": json.dumps(accumulate),
476-
},
477-
)
479+
attributes={
480+
"gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME,
481+
"gen_ai.event.content": json.dumps(accumulate),
482+
}
483+
self._instrumentor._record_event(span, "gen_ai.choice", attributes, previous_event_timestamp)
478484
span.finish()
479485

480486
return StreamWrapper(stream_obj, self)
481487

482488
def _async_wrapped_stream(
483-
self, stream_obj: _models.AsyncStreamingChatCompletions, span: "AbstractSpan"
489+
self, stream_obj: _models.AsyncStreamingChatCompletions, span: "AbstractSpan", last_event_timestamp_ns: int
484490
) -> _models.AsyncStreamingChatCompletions:
485491
class AsyncStreamWrapper(_models.AsyncStreamingChatCompletions):
486-
def __init__(self, stream_obj, instrumentor, span):
492+
def __init__(self, stream_obj, instrumentor, span, last_event_timestamp_ns):
487493
super().__init__(stream_obj._response)
488494
self._instrumentor = instrumentor
489495
self._accumulate: Dict[str, Any] = {}
490496
self._stream_obj = stream_obj
491497
self.span = span
492498
self._last_result = None
499+
self._last_event_timestamp_ns = last_event_timestamp_ns
493500

494501
async def __anext__(self) -> "_models.StreamingChatCompletionsUpdate":
495502
try:
@@ -523,19 +530,44 @@ def _trace_stream_content(self) -> None:
523530
self._accumulate["message"]["tool_calls"]
524531
)
525532
self._accumulate["message"]["tool_calls"] = list(tools_no_recording)
526-
527-
self.span.span_instance.add_event(
528-
name="gen_ai.choice",
529-
attributes={
530-
"gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME,
531-
"gen_ai.event.content": json.dumps(self._accumulate),
532-
},
533-
)
533+
attributes={
534+
"gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME,
535+
"gen_ai.event.content": json.dumps(self._accumulate),
536+
}
537+
self._last_event_timestamp_ns = self._instrumentor._record_event( # pylint: disable=protected-access, line-too-long # pyright: ignore [reportFunctionMemberAccess]
538+
span,
539+
"gen_ai.choice",
540+
attributes,
541+
self._last_event_timestamp_ns
542+
)
534543
span.finish()
535544

536-
async_stream_wrapper = AsyncStreamWrapper(stream_obj, self, span)
545+
async_stream_wrapper = AsyncStreamWrapper(stream_obj, self, span, last_event_timestamp_ns)
537546
return async_stream_wrapper
538547

548+
def _record_event(self, span: "AbstractSpan", name: str,
549+
attributes: Dict[str, Any], last_event_timestamp_ns: int
550+
) -> int:
551+
timestamp = time_ns()
552+
553+
# we're recording multiple events, some of them are emitted within (hundreds of) nanoseconds of each other.
554+
# time.time_ns resolution is not high enough on windows to guarantee unique timestamps for each message.
555+
# Also Azure Monitor truncates resolution to microseconds and some other backends truncate to milliseconds.
556+
#
557+
# But we need to give users a way to restore event order, so we're incrementing the timestamp
558+
# by 1 microsecond for each message.
559+
#
560+
# This is a workaround, we'll find a generic and better solution - see
561+
# https://github.com/open-telemetry/semantic-conventions/issues/1701
562+
if last_event_timestamp_ns > 0 and timestamp <= (last_event_timestamp_ns + 1000):
563+
timestamp = last_event_timestamp_ns + 1000
564+
565+
span.span_instance.add_event(name=name,
566+
attributes=attributes,
567+
timestamp=timestamp)
568+
569+
return timestamp
570+
539571
def _trace_sync_function(
540572
self,
541573
function: Callable,
@@ -580,16 +612,16 @@ def inner(*args, **kwargs):
580612
name=span_name,
581613
kind=SpanKind.CLIENT, # pyright: ignore [reportPossiblyUnboundVariable]
582614
)
615+
583616
try:
584617
# tracing events not supported in azure-core-tracing-opentelemetry
585618
# so need to access the span instance directly
586619
with span_impl_type.change_context(span.span_instance):
587-
self._add_request_span_attributes(span, span_name, args, kwargs)
620+
last_event_timestamp_ns = self._add_request_details(span, args, kwargs)
588621
result = function(*args, **kwargs)
589622
if kwargs.get("stream") is True:
590-
return self._wrapped_stream(result, span)
591-
self._add_response_span_attributes(span, result)
592-
623+
return self._wrapped_stream(result, span, last_event_timestamp_ns)
624+
self._add_response_details(span, result, last_event_timestamp_ns)
593625
except Exception as exc:
594626
# Set the span status to error
595627
if isinstance(span.span_instance, Span): # pyright: ignore [reportPossiblyUnboundVariable]
@@ -659,11 +691,11 @@ async def inner(*args, **kwargs):
659691
# tracing events not supported in azure-core-tracing-opentelemetry
660692
# so need to access the span instance directly
661693
with span_impl_type.change_context(span.span_instance):
662-
self._add_request_span_attributes(span, span_name, args, kwargs)
694+
last_event_timestamp_ns = self._add_request_details(span, args, kwargs)
663695
result = await function(*args, **kwargs)
664696
if kwargs.get("stream") is True:
665-
return self._async_wrapped_stream(result, span)
666-
self._add_response_span_attributes(span, result)
697+
return self._async_wrapped_stream(result, span, last_event_timestamp_ns)
698+
self._add_response_details(span, result, last_event_timestamp_ns)
667699

668700
except Exception as exc:
669701
# Set the span status to error

sdk/ai/azure-ai-inference/samples/sample_chat_completions_with_tracing.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@
2323
where `your-deployment-name` is your unique AI Model deployment name, and
2424
`your-azure-region` is the Azure region where your model is deployed.
2525
2) AZURE_AI_CHAT_KEY - Your model key (a 32-character string). Keep it secret.
26-
3) AZURE_TRACING_GEN_AI_CONTENT_RECORDING_ENABLED - Optional. Set to 'true'
27-
for detailed traces, including chat request and response messages.
26+
3) AZURE_TRACING_GEN_AI_CONTENT_RECORDING_ENABLED - Set to 'true' to enable content recording.
2827
"""
2928

3029

sdk/ai/azure-ai-inference/tests/gen_ai_trace_verifier.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,11 @@ def check_span_events(self, span, expected_events):
106106
if len(span_events) > 0: # If there are any additional events in the span_events
107107
return False
108108

109+
prev_event = None
110+
for actual_event in list(span.events):
111+
if prev_event is not None and actual_event.timestamp <= prev_event.timestamp:
112+
print(f"Event {actual_event.name} has a timestamp {actual_event.timestamp} that is not greater than the previous event's timestamp {prev_event.timestamp}, {prev_event.name}")
113+
return False
114+
prev_event = actual_event
115+
109116
return True

0 commit comments

Comments
 (0)