Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add upload hook to genai utils to implement semconv v1.37.

The hook uses [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/) to support
various pluggable backends.
([#3752](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752))
([#3759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752))
([#3763](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3763))
- Add a utility to parse the `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` environment variable.
Add `gen_ai_latest_experimental` as a new value to the Sem Conv stability flag ([#3716](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3716)).
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,28 @@
from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import asdict, dataclass
from functools import partial
from typing import Any, Callable, Literal, TextIO, cast
from typing import Any, Callable, Final, Literal, TextIO, cast
from uuid import uuid4

import fsspec

from opentelemetry._logs import LogRecord
from opentelemetry.semconv._incubating.attributes import gen_ai_attributes
from opentelemetry.trace import Span
from opentelemetry.util.genai import types
from opentelemetry.util.genai.upload_hook import UploadHook

GEN_AI_INPUT_MESSAGES_REF: Final = (
gen_ai_attributes.GEN_AI_INPUT_MESSAGES + "_ref"
)
GEN_AI_OUTPUT_MESSAGES_REF: Final = (
gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES + "_ref"
)
GEN_AI_SYSTEM_INSTRUCTIONS_REF: Final = (
gen_ai_attributes.GEN_AI_SYSTEM_INSTRUCTIONS + "_ref"
)


_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -177,7 +189,19 @@ def to_dict(
},
)

# TODO: stamp the refs on telemetry
# stamp the refs on telemetry
references = {
GEN_AI_INPUT_MESSAGES_REF: ref_names.inputs_ref,
GEN_AI_OUTPUT_MESSAGES_REF: ref_names.outputs_ref,
GEN_AI_SYSTEM_INSTRUCTIONS_REF: ref_names.system_instruction_ref,
}
if span:
span.set_attributes(references)
if log_record:
log_record.attributes = {
**(log_record.attributes or {}),
**references,
}

def shutdown(self) -> None:
# TODO: support timeout
Expand Down
65 changes: 58 additions & 7 deletions util/opentelemetry-util-genai/tests/test_fsspec_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from unittest.mock import MagicMock, patch

import fsspec
from fsspec.implementations.memory import MemoryFileSystem

from opentelemetry._logs import LogRecord
from opentelemetry.test.test_base import TestBase
from opentelemetry.util.genai import types
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
Expand Down Expand Up @@ -200,9 +200,6 @@ def test_upload(self):


class TestFsspecUploadHookIntegration(TestBase):
def setUp(self):
MemoryFileSystem.store.clear()

def assert_fsspec_equal(self, path: str, value: str) -> None:
with fsspec.open(path, "r") as file:
self.assertEqual(file.read(), value)
Expand All @@ -211,13 +208,67 @@ def test_upload_completions(self):
hook = FsspecUploadHook(
base_path=BASE_PATH,
)
tracer = self.tracer_provider.get_tracer(__name__)
log_record = LogRecord()

with tracer.start_as_current_span("chat mymodel") as span:
hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
span=span,
log_record=log_record,
)
hook.shutdown()

finished_spans = self.get_finished_spans()
self.assertEqual(len(finished_spans), 1)
span = finished_spans[0]

# span attributes, log attributes, and log body have refs
for attributes in [
span.attributes,
log_record.attributes,
]:
for ref_key in [
"gen_ai.input.messages_ref",
"gen_ai.output.messages_ref",
"gen_ai.system_instructions_ref",
]:
self.assertIn(ref_key, attributes)

self.assert_fsspec_equal(
span.attributes["gen_ai.input.messages_ref"],
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]',
)
self.assert_fsspec_equal(
span.attributes["gen_ai.output.messages_ref"],
'[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]',
)
self.assert_fsspec_equal(
span.attributes["gen_ai.system_instructions_ref"],
'[{"content":"You are a helpful assistant.","type":"text"}]',
)

@staticmethod
def upload_with_log(log_record: LogRecord):
hook = FsspecUploadHook(
base_path=BASE_PATH,
)

hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
log_record=log_record,
)
hook.shutdown()

fs = fsspec.open(BASE_PATH).fs
self.assertEqual(len(fs.ls(BASE_PATH)), 3)
# TODO: test stamped telemetry
def test_stamps_empty_log(self):
log_record = LogRecord()
self.upload_with_log(log_record)

# stamp on both body and attributes
self.assertIn("gen_ai.input.messages_ref", log_record.attributes)
self.assertIn("gen_ai.output.messages_ref", log_record.attributes)
self.assertIn("gen_ai.system_instructions_ref", log_record.attributes)