From 2d41bc7d809c4eb4462c214ea68636a79d1fccae Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Wed, 17 Sep 2025 20:14:36 +0000 Subject: [PATCH 1/3] Stamp gen ai refs on spans and logs --- util/opentelemetry-util-genai/CHANGELOG.md | 7 ++ .../util/genai/_fsspec_upload/fsspec_hook.py | 37 +++++++- .../tests/test_fsspec_upload.py | 84 +++++++++++++++++-- 3 files changed, 119 insertions(+), 9 deletions(-) diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 5d99eb53ec..bfd4c4daab 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,5 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add upload hook to genai utils to implement semconv v1.37. + + The hook uses [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/) to support + various pluggable backends. + ([#3752](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752)) + ([#3759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752)) + ([#3763](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3763)) - Add a utility to parse the `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` environment variable. Add `gen_ai_latest_experimental` as a new value to the Sem Conv stability flag ([#3716](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3716)). diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py index 9bfbc864f0..c7c2d5fcfe 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py @@ -19,19 +19,32 @@ import logging import posixpath import threading +from collections.abc import Mapping from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import asdict, dataclass from functools import partial -from typing import Any, Callable, Literal, TextIO, cast +from typing import Any, Callable, Final, Literal, TextIO, cast from uuid import uuid4 import fsspec from opentelemetry._logs import LogRecord +from opentelemetry.semconv._incubating.attributes import gen_ai_attributes from opentelemetry.trace import Span from opentelemetry.util.genai import types from opentelemetry.util.genai.upload_hook import UploadHook +GEN_AI_INPUT_MESSAGES_REF: Final = ( + gen_ai_attributes.GEN_AI_INPUT_MESSAGES + "_ref" +) +GEN_AI_OUTPUT_MESSAGES_REF: Final = ( + gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES + "_ref" +) +GEN_AI_SYSTEM_INSTRUCTIONS_REF: Final = ( + gen_ai_attributes.GEN_AI_SYSTEM_INSTRUCTIONS + "_ref" +) + + _logger = logging.getLogger(__name__) @@ -177,7 +190,27 @@ def to_dict( }, ) - # TODO: stamp the refs on telemetry + # stamp the refs on telemetry + references = { + GEN_AI_INPUT_MESSAGES_REF: ref_names.inputs_ref, + GEN_AI_OUTPUT_MESSAGES_REF: ref_names.outputs_ref, + GEN_AI_SYSTEM_INSTRUCTIONS_REF: ref_names.system_instruction_ref, + } + if span: + span.set_attributes(references) + if log_record: + # set in both attributes and body until they are consolidated + # https://github.com/open-telemetry/semantic-conventions/issues/1870 + log_record.attributes = { + **(log_record.attributes or {}), + **references, + } + + if log_record.body is None or isinstance(log_record.body, Mapping): + log_record.body = { + **(log_record.body or {}), + **references, + } def shutdown(self) -> None: # TODO: support timeout diff --git a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py index de55e28263..684c4a9361 100644 --- a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py +++ b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py @@ -25,8 +25,8 @@ from unittest.mock import MagicMock, patch import fsspec -from fsspec.implementations.memory import MemoryFileSystem +from opentelemetry._logs import LogRecord from opentelemetry.test.test_base import TestBase from opentelemetry.util.genai import types from opentelemetry.util.genai._fsspec_upload.fsspec_hook import ( @@ -200,9 +200,6 @@ def test_upload(self): class TestFsspecUploadHookIntegration(TestBase): - def setUp(self): - MemoryFileSystem.store.clear() - def assert_fsspec_equal(self, path: str, value: str) -> None: with fsspec.open(path, "r") as file: self.assertEqual(file.read(), value) @@ -211,13 +208,86 @@ def test_upload_completions(self): hook = FsspecUploadHook( base_path=BASE_PATH, ) + tracer = self.tracer_provider.get_tracer(__name__) + log_record = LogRecord() + + with tracer.start_as_current_span("chat mymodel") as span: + hook.upload( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + span=span, + log_record=log_record, + ) + hook.shutdown() + + finished_spans = self.get_finished_spans() + self.assertEqual(len(finished_spans), 1) + span = finished_spans[0] + + # span attributes, log attributes, and log body have refs + for attributes in [ + span.attributes, + log_record.attributes, + log_record.body, + ]: + for ref_key in [ + "gen_ai.input.messages_ref", + "gen_ai.output.messages_ref", + "gen_ai.system_instructions_ref", + ]: + self.assertIn(ref_key, attributes) + + self.assert_fsspec_equal( + span.attributes["gen_ai.input.messages_ref"], + '[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]', + ) + self.assert_fsspec_equal( + span.attributes["gen_ai.output.messages_ref"], + '[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]', + ) + self.assert_fsspec_equal( + span.attributes["gen_ai.system_instructions_ref"], + '[{"content":"You are a helpful assistant.","type":"text"}]', + ) + + @staticmethod + def upload_with_log(log_record: LogRecord): + hook = FsspecUploadHook( + base_path=BASE_PATH, + ) + hook.upload( inputs=FAKE_INPUTS, outputs=FAKE_OUTPUTS, system_instruction=FAKE_SYSTEM_INSTRUCTION, + log_record=log_record, ) hook.shutdown() - fs = fsspec.open(BASE_PATH).fs - self.assertEqual(len(fs.ls(BASE_PATH)), 3) - # TODO: test stamped telemetry + def test_stamps_empty_log(self): + log_record = LogRecord() + self.upload_with_log(log_record) + + # stamp on both body and attributes + self.assertIn("gen_ai.input.messages_ref", log_record.attributes) + self.assertIn("gen_ai.output.messages_ref", log_record.attributes) + self.assertIn("gen_ai.system_instructions_ref", log_record.attributes) + self.assertIn("gen_ai.input.messages_ref", log_record.body) + self.assertIn("gen_ai.output.messages_ref", log_record.body) + self.assertIn("gen_ai.system_instructions_ref", log_record.body) + + def test_stamps_log_with_map_body(self): + log_record = LogRecord(body={"hello": "world"}) + self.upload_with_log(log_record) + + # stamp on both body and attributes, preserving existing + self.assertEqual(log_record.body["hello"], "world") + self.assertIn("gen_ai.input.messages_ref", log_record.body) + self.assertIn("gen_ai.output.messages_ref", log_record.body) + self.assertIn("gen_ai.system_instructions_ref", log_record.body) + + def test_doesnt_stamp_log_string_body(self): + log_record = LogRecord(body="hello world") + self.upload_with_log(log_record) + self.assertEqual(log_record.body, "hello world") From 7c4600ccf1fc376cd95f928d0d75b29330bfcf61 Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Thu, 18 Sep 2025 15:21:25 +0000 Subject: [PATCH 2/3] remove log body stamping --- .../util/genai/_fsspec_upload/fsspec_hook.py | 8 -------- .../tests/test_fsspec_upload.py | 19 ------------------- 2 files changed, 27 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py index c7c2d5fcfe..ad6ccb75f8 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py @@ -199,19 +199,11 @@ def to_dict( if span: span.set_attributes(references) if log_record: - # set in both attributes and body until they are consolidated - # https://github.com/open-telemetry/semantic-conventions/issues/1870 log_record.attributes = { **(log_record.attributes or {}), **references, } - if log_record.body is None or isinstance(log_record.body, Mapping): - log_record.body = { - **(log_record.body or {}), - **references, - } - def shutdown(self) -> None: # TODO: support timeout self._executor.shutdown() diff --git a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py index 684c4a9361..670a3b9342 100644 --- a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py +++ b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py @@ -229,7 +229,6 @@ def test_upload_completions(self): for attributes in [ span.attributes, log_record.attributes, - log_record.body, ]: for ref_key in [ "gen_ai.input.messages_ref", @@ -273,21 +272,3 @@ def test_stamps_empty_log(self): self.assertIn("gen_ai.input.messages_ref", log_record.attributes) self.assertIn("gen_ai.output.messages_ref", log_record.attributes) self.assertIn("gen_ai.system_instructions_ref", log_record.attributes) - self.assertIn("gen_ai.input.messages_ref", log_record.body) - self.assertIn("gen_ai.output.messages_ref", log_record.body) - self.assertIn("gen_ai.system_instructions_ref", log_record.body) - - def test_stamps_log_with_map_body(self): - log_record = LogRecord(body={"hello": "world"}) - self.upload_with_log(log_record) - - # stamp on both body and attributes, preserving existing - self.assertEqual(log_record.body["hello"], "world") - self.assertIn("gen_ai.input.messages_ref", log_record.body) - self.assertIn("gen_ai.output.messages_ref", log_record.body) - self.assertIn("gen_ai.system_instructions_ref", log_record.body) - - def test_doesnt_stamp_log_string_body(self): - log_record = LogRecord(body="hello world") - self.upload_with_log(log_record) - self.assertEqual(log_record.body, "hello world") From 738b146697428799aae89a5510adc2671edfa0a2 Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Thu, 18 Sep 2025 19:43:14 +0000 Subject: [PATCH 3/3] fix lint --- .../src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py | 1 - 1 file changed, 1 deletion(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py index ad6ccb75f8..319b5252cb 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py @@ -19,7 +19,6 @@ import logging import posixpath import threading -from collections.abc import Mapping from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import asdict, dataclass from functools import partial