diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 5d99eb53ec..bfd4c4daab 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,5 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add upload hook to genai utils to implement semconv v1.37. + + The hook uses [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/) to support + various pluggable backends. + ([#3752](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752)) + ([#3759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752)) + ([#3763](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3763)) - Add a utility to parse the `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` environment variable. Add `gen_ai_latest_experimental` as a new value to the Sem Conv stability flag ([#3716](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3716)). diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py index 9bfbc864f0..319b5252cb 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py @@ -22,16 +22,28 @@ from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import asdict, dataclass from functools import partial -from typing import Any, Callable, Literal, TextIO, cast +from typing import Any, Callable, Final, Literal, TextIO, cast from uuid import uuid4 import fsspec from opentelemetry._logs import LogRecord +from opentelemetry.semconv._incubating.attributes import gen_ai_attributes from opentelemetry.trace import Span from opentelemetry.util.genai import types from opentelemetry.util.genai.upload_hook import UploadHook +GEN_AI_INPUT_MESSAGES_REF: Final = ( + gen_ai_attributes.GEN_AI_INPUT_MESSAGES + "_ref" +) +GEN_AI_OUTPUT_MESSAGES_REF: Final = ( + gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES + "_ref" +) +GEN_AI_SYSTEM_INSTRUCTIONS_REF: Final = ( + gen_ai_attributes.GEN_AI_SYSTEM_INSTRUCTIONS + "_ref" +) + + _logger = logging.getLogger(__name__) @@ -177,7 +189,19 @@ def to_dict( }, ) - # TODO: stamp the refs on telemetry + # stamp the refs on telemetry + references = { + GEN_AI_INPUT_MESSAGES_REF: ref_names.inputs_ref, + GEN_AI_OUTPUT_MESSAGES_REF: ref_names.outputs_ref, + GEN_AI_SYSTEM_INSTRUCTIONS_REF: ref_names.system_instruction_ref, + } + if span: + span.set_attributes(references) + if log_record: + log_record.attributes = { + **(log_record.attributes or {}), + **references, + } def shutdown(self) -> None: # TODO: support timeout diff --git a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py index de55e28263..670a3b9342 100644 --- a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py +++ b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py @@ -25,8 +25,8 @@ from unittest.mock import MagicMock, patch import fsspec -from fsspec.implementations.memory import MemoryFileSystem +from opentelemetry._logs import LogRecord from opentelemetry.test.test_base import TestBase from opentelemetry.util.genai import types from opentelemetry.util.genai._fsspec_upload.fsspec_hook import ( @@ -200,9 +200,6 @@ def test_upload(self): class TestFsspecUploadHookIntegration(TestBase): - def setUp(self): - MemoryFileSystem.store.clear() - def assert_fsspec_equal(self, path: str, value: str) -> None: with fsspec.open(path, "r") as file: self.assertEqual(file.read(), value) @@ -211,13 +208,67 @@ def test_upload_completions(self): hook = FsspecUploadHook( base_path=BASE_PATH, ) + tracer = self.tracer_provider.get_tracer(__name__) + log_record = LogRecord() + + with tracer.start_as_current_span("chat mymodel") as span: + hook.upload( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + span=span, + log_record=log_record, + ) + hook.shutdown() + + finished_spans = self.get_finished_spans() + self.assertEqual(len(finished_spans), 1) + span = finished_spans[0] + + # span attributes, log attributes, and log body have refs + for attributes in [ + span.attributes, + log_record.attributes, + ]: + for ref_key in [ + "gen_ai.input.messages_ref", + "gen_ai.output.messages_ref", + "gen_ai.system_instructions_ref", + ]: + self.assertIn(ref_key, attributes) + + self.assert_fsspec_equal( + span.attributes["gen_ai.input.messages_ref"], + '[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]', + ) + self.assert_fsspec_equal( + span.attributes["gen_ai.output.messages_ref"], + '[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]', + ) + self.assert_fsspec_equal( + span.attributes["gen_ai.system_instructions_ref"], + '[{"content":"You are a helpful assistant.","type":"text"}]', + ) + + @staticmethod + def upload_with_log(log_record: LogRecord): + hook = FsspecUploadHook( + base_path=BASE_PATH, + ) + hook.upload( inputs=FAKE_INPUTS, outputs=FAKE_OUTPUTS, system_instruction=FAKE_SYSTEM_INSTRUCTION, + log_record=log_record, ) hook.shutdown() - fs = fsspec.open(BASE_PATH).fs - self.assertEqual(len(fs.ls(BASE_PATH)), 3) - # TODO: test stamped telemetry + def test_stamps_empty_log(self): + log_record = LogRecord() + self.upload_with_log(log_record) + + # stamp on both body and attributes + self.assertIn("gen_ai.input.messages_ref", log_record.attributes) + self.assertIn("gen_ai.output.messages_ref", log_record.attributes) + self.assertIn("gen_ai.system_instructions_ref", log_record.attributes)