Skip to content

Commit 2d41bc7

Browse files
committed
Stamp gen ai refs on spans and logs
1 parent ee947ce commit 2d41bc7

File tree

3 files changed

+119
-9
lines changed

3 files changed

+119
-9
lines changed

util/opentelemetry-util-genai/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
- Add upload hook to genai utils to implement semconv v1.37.
11+
12+
The hook uses [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/) to support
13+
various pluggable backends.
14+
([#3752](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752))
15+
([#3759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752))
16+
([#3763](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3763))
1017
- Add a utility to parse the `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` environment variable.
1118
Add `gen_ai_latest_experimental` as a new value to the Sem Conv stability flag ([#3716](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3716)).

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,32 @@
1919
import logging
2020
import posixpath
2121
import threading
22+
from collections.abc import Mapping
2223
from concurrent.futures import Future, ThreadPoolExecutor
2324
from dataclasses import asdict, dataclass
2425
from functools import partial
25-
from typing import Any, Callable, Literal, TextIO, cast
26+
from typing import Any, Callable, Final, Literal, TextIO, cast
2627
from uuid import uuid4
2728

2829
import fsspec
2930

3031
from opentelemetry._logs import LogRecord
32+
from opentelemetry.semconv._incubating.attributes import gen_ai_attributes
3133
from opentelemetry.trace import Span
3234
from opentelemetry.util.genai import types
3335
from opentelemetry.util.genai.upload_hook import UploadHook
3436

37+
GEN_AI_INPUT_MESSAGES_REF: Final = (
38+
gen_ai_attributes.GEN_AI_INPUT_MESSAGES + "_ref"
39+
)
40+
GEN_AI_OUTPUT_MESSAGES_REF: Final = (
41+
gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES + "_ref"
42+
)
43+
GEN_AI_SYSTEM_INSTRUCTIONS_REF: Final = (
44+
gen_ai_attributes.GEN_AI_SYSTEM_INSTRUCTIONS + "_ref"
45+
)
46+
47+
3548
_logger = logging.getLogger(__name__)
3649

3750

@@ -177,7 +190,27 @@ def to_dict(
177190
},
178191
)
179192

180-
# TODO: stamp the refs on telemetry
193+
# stamp the refs on telemetry
194+
references = {
195+
GEN_AI_INPUT_MESSAGES_REF: ref_names.inputs_ref,
196+
GEN_AI_OUTPUT_MESSAGES_REF: ref_names.outputs_ref,
197+
GEN_AI_SYSTEM_INSTRUCTIONS_REF: ref_names.system_instruction_ref,
198+
}
199+
if span:
200+
span.set_attributes(references)
201+
if log_record:
202+
# set in both attributes and body until they are consolidated
203+
# https://github.com/open-telemetry/semantic-conventions/issues/1870
204+
log_record.attributes = {
205+
**(log_record.attributes or {}),
206+
**references,
207+
}
208+
209+
if log_record.body is None or isinstance(log_record.body, Mapping):
210+
log_record.body = {
211+
**(log_record.body or {}),
212+
**references,
213+
}
181214

182215
def shutdown(self) -> None:
183216
# TODO: support timeout

util/opentelemetry-util-genai/tests/test_fsspec_upload.py

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
from unittest.mock import MagicMock, patch
2626

2727
import fsspec
28-
from fsspec.implementations.memory import MemoryFileSystem
2928

29+
from opentelemetry._logs import LogRecord
3030
from opentelemetry.test.test_base import TestBase
3131
from opentelemetry.util.genai import types
3232
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
@@ -200,9 +200,6 @@ def test_upload(self):
200200

201201

202202
class TestFsspecUploadHookIntegration(TestBase):
203-
def setUp(self):
204-
MemoryFileSystem.store.clear()
205-
206203
def assert_fsspec_equal(self, path: str, value: str) -> None:
207204
with fsspec.open(path, "r") as file:
208205
self.assertEqual(file.read(), value)
@@ -211,13 +208,86 @@ def test_upload_completions(self):
211208
hook = FsspecUploadHook(
212209
base_path=BASE_PATH,
213210
)
211+
tracer = self.tracer_provider.get_tracer(__name__)
212+
log_record = LogRecord()
213+
214+
with tracer.start_as_current_span("chat mymodel") as span:
215+
hook.upload(
216+
inputs=FAKE_INPUTS,
217+
outputs=FAKE_OUTPUTS,
218+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
219+
span=span,
220+
log_record=log_record,
221+
)
222+
hook.shutdown()
223+
224+
finished_spans = self.get_finished_spans()
225+
self.assertEqual(len(finished_spans), 1)
226+
span = finished_spans[0]
227+
228+
# span attributes, log attributes, and log body have refs
229+
for attributes in [
230+
span.attributes,
231+
log_record.attributes,
232+
log_record.body,
233+
]:
234+
for ref_key in [
235+
"gen_ai.input.messages_ref",
236+
"gen_ai.output.messages_ref",
237+
"gen_ai.system_instructions_ref",
238+
]:
239+
self.assertIn(ref_key, attributes)
240+
241+
self.assert_fsspec_equal(
242+
span.attributes["gen_ai.input.messages_ref"],
243+
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]',
244+
)
245+
self.assert_fsspec_equal(
246+
span.attributes["gen_ai.output.messages_ref"],
247+
'[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]',
248+
)
249+
self.assert_fsspec_equal(
250+
span.attributes["gen_ai.system_instructions_ref"],
251+
'[{"content":"You are a helpful assistant.","type":"text"}]',
252+
)
253+
254+
@staticmethod
255+
def upload_with_log(log_record: LogRecord):
256+
hook = FsspecUploadHook(
257+
base_path=BASE_PATH,
258+
)
259+
214260
hook.upload(
215261
inputs=FAKE_INPUTS,
216262
outputs=FAKE_OUTPUTS,
217263
system_instruction=FAKE_SYSTEM_INSTRUCTION,
264+
log_record=log_record,
218265
)
219266
hook.shutdown()
220267

221-
fs = fsspec.open(BASE_PATH).fs
222-
self.assertEqual(len(fs.ls(BASE_PATH)), 3)
223-
# TODO: test stamped telemetry
268+
def test_stamps_empty_log(self):
269+
log_record = LogRecord()
270+
self.upload_with_log(log_record)
271+
272+
# stamp on both body and attributes
273+
self.assertIn("gen_ai.input.messages_ref", log_record.attributes)
274+
self.assertIn("gen_ai.output.messages_ref", log_record.attributes)
275+
self.assertIn("gen_ai.system_instructions_ref", log_record.attributes)
276+
self.assertIn("gen_ai.input.messages_ref", log_record.body)
277+
self.assertIn("gen_ai.output.messages_ref", log_record.body)
278+
self.assertIn("gen_ai.system_instructions_ref", log_record.body)
279+
280+
def test_stamps_log_with_map_body(self):
281+
log_record = LogRecord(body={"hello": "world"})
282+
self.upload_with_log(log_record)
283+
284+
# stamp on both body and attributes, preserving existing
285+
self.assertEqual(log_record.body["hello"], "world")
286+
self.assertIn("gen_ai.input.messages_ref", log_record.body)
287+
self.assertIn("gen_ai.output.messages_ref", log_record.body)
288+
self.assertIn("gen_ai.system_instructions_ref", log_record.body)
289+
290+
def test_doesnt_stamp_log_string_body(self):
291+
log_record = LogRecord(body="hello world")
292+
self.upload_with_log(log_record)
293+
self.assertEqual(log_record.body, "hello world")

0 commit comments

Comments
 (0)