diff --git a/ddtrace/contrib/internal/anthropic/utils.py b/ddtrace/contrib/internal/anthropic/utils.py deleted file mode 100644 index 0721f33529d..00000000000 --- a/ddtrace/contrib/internal/anthropic/utils.py +++ /dev/null @@ -1,4 +0,0 @@ -from ddtrace.internal.logger import get_logger - - -log = get_logger(__name__) diff --git a/ddtrace/llmobs/_integrations/anthropic.py b/ddtrace/llmobs/_integrations/anthropic.py index 17b89fe00fb..ac506bcd97c 100644 --- a/ddtrace/llmobs/_integrations/anthropic.py +++ b/ddtrace/llmobs/_integrations/anthropic.py @@ -24,9 +24,10 @@ from ddtrace.llmobs._integrations.base import BaseLLMIntegration from ddtrace.llmobs._integrations.utils import update_proxy_workflow_input_output_value from ddtrace.llmobs._utils import _get_attr -from ddtrace.llmobs.utils import ToolCall -from ddtrace.llmobs.utils import ToolDefinition -from ddtrace.llmobs.utils import ToolResult +from ddtrace.llmobs.types import Message +from ddtrace.llmobs.types import ToolCall +from ddtrace.llmobs.types import ToolDefinition +from ddtrace.llmobs.types import ToolResult from ddtrace.trace import Span @@ -69,9 +70,9 @@ def _llmobs_set_tags( span._set_ctx_item(TOOL_DEFINITIONS, tools) messages = kwargs.get("messages") system_prompt = kwargs.get("system") - input_messages = self._extract_input_message(messages, system_prompt) + input_messages = self._extract_input_message(list(messages) if messages else [], system_prompt) - output_messages = [{"content": ""}] + output_messages: List[Message] = [Message(content="")] if not span.error and response is not None: output_messages = self._extract_output_message(response) span_kind = "workflow" if span._get_ctx_item(PROXY_REQUEST) else "llm" @@ -92,14 +93,16 @@ def _llmobs_set_tags( ) update_proxy_workflow_input_output_value(span, span_kind) - def _extract_input_message(self, messages, system_prompt: Optional[Union[str, List[Dict[str, Any]]]] = None): + def _extract_input_message( + self, messages: List[Dict[str, Any]], system_prompt: Optional[Union[str, List[Dict[str, Any]]]] = None + ) -> List[Message]: """Extract input messages from the stored prompt. Anthropic allows for messages and multiple texts in a message, which requires some special casing. """ if not isinstance(messages, Iterable): log.warning("Anthropic input must be a list of messages.") - input_messages = [] + input_messages: List[Message] = [] if system_prompt is not None: messages = [{"content": system_prompt, "role": "system"}] + messages @@ -115,16 +118,16 @@ def _extract_input_message(self, messages, system_prompt: Optional[Union[str, Li log.warning("Anthropic input message must have content and role.") if isinstance(content, str): - input_messages.append({"content": content, "role": role}) + input_messages.append(Message(content=content, role=str(role))) elif isinstance(content, list): for block in content: if _get_attr(block, "type", None) == "text": - input_messages.append({"content": _get_attr(block, "text", ""), "role": role}) + input_messages.append(Message(content=str(_get_attr(block, "text", "")), role=str(role))) elif _get_attr(block, "type", None) == "image": # Store a placeholder for potentially enormous binary image data. - input_messages.append({"content": "([IMAGE DETECTED])", "role": role}) + input_messages.append(Message(content="([IMAGE DETECTED])", role=str(role))) elif _get_attr(block, "type", None) == "tool_use": text = _get_attr(block, "text", None) @@ -132,26 +135,26 @@ def _extract_input_message(self, messages, system_prompt: Optional[Union[str, Li if isinstance(input_data, str): input_data = json.loads(input_data) tool_call_info = ToolCall( - name=_get_attr(block, "name", ""), + name=str(_get_attr(block, "name", "")), arguments=input_data, - tool_id=_get_attr(block, "id", ""), - type=_get_attr(block, "type", ""), + tool_id=str(_get_attr(block, "id", "")), + type=str(_get_attr(block, "type", "")), ) if text is None: text = "" - input_messages.append({"content": text, "role": role, "tool_calls": [tool_call_info]}) + input_messages.append(Message(content=str(text), role=str(role), tool_calls=[tool_call_info])) elif _get_attr(block, "type", None) == "tool_result": content = _get_attr(block, "content", None) formatted_content = self._format_tool_result_content(content) tool_result_info = ToolResult( result=formatted_content, - tool_id=_get_attr(block, "tool_use_id", ""), + tool_id=str(_get_attr(block, "tool_use_id", "")), type="tool_result", ) - input_messages.append({"content": "", "role": role, "tool_results": [tool_result_info]}) + input_messages.append(Message(content="", role=str(role), tool_results=[tool_result_info])) else: - input_messages.append({"content": str(block), "role": role}) + input_messages.append(Message(content=str(block), role=str(role))) return input_messages @@ -169,34 +172,33 @@ def _format_tool_result_content(self, content) -> str: return ",".join(formatted_content) return str(content) - def _extract_output_message(self, response): + def _extract_output_message(self, response) -> List[Message]: """Extract output messages from the stored response.""" - output_messages = [] + output_messages: List[Message] = [] content = _get_attr(response, "content", "") role = _get_attr(response, "role", "") if isinstance(content, str): - return [{"content": content, "role": role}] + return [Message(content=content, role=str(role))] elif isinstance(content, list): for completion in content: text = _get_attr(completion, "text", None) if isinstance(text, str): - output_messages.append({"content": text, "role": role}) - else: - if _get_attr(completion, "type", None) == "tool_use": - input_data = _get_attr(completion, "input", "") - if isinstance(input_data, str): - input_data = json.loads(input_data) - tool_call_info = ToolCall( - name=_get_attr(completion, "name", ""), - arguments=input_data, - tool_id=_get_attr(completion, "id", ""), - type=_get_attr(completion, "type", ""), - ) - if text is None: - text = "" - output_messages.append({"content": text, "role": role, "tool_calls": [tool_call_info]}) + output_messages.append(Message(content=text, role=str(role))) + elif _get_attr(completion, "type", None) == "tool_use": + input_data = _get_attr(completion, "input", "") + if isinstance(input_data, str): + input_data = json.loads(input_data) + tool_call_info = ToolCall( + name=str(_get_attr(completion, "name", "")), + arguments=input_data, + tool_id=str(_get_attr(completion, "id", "")), + type=str(_get_attr(completion, "type", "")), + ) + if text is None: + text = "" + output_messages.append(Message(content=str(text), role=str(role), tool_calls=[tool_call_info])) return output_messages def _extract_usage(self, span: Span, usage: Dict[str, Any]): diff --git a/ddtrace/llmobs/_integrations/bedrock.py b/ddtrace/llmobs/_integrations/bedrock.py index 083d19f9803..b6eb244e300 100644 --- a/ddtrace/llmobs/_integrations/bedrock.py +++ b/ddtrace/llmobs/_integrations/bedrock.py @@ -35,7 +35,8 @@ from ddtrace.llmobs._telemetry import record_bedrock_agent_span_event_created from ddtrace.llmobs._utils import _get_attr from ddtrace.llmobs._writer import LLMObsSpanEvent -from ddtrace.llmobs.utils import ToolDefinition +from ddtrace.llmobs.types import Message +from ddtrace.llmobs.types import ToolDefinition from ddtrace.trace import Span @@ -110,7 +111,7 @@ def _llmobs_set_tags( self._extract_input_message_for_converse(prompt) if is_converse else self._extract_input_message(prompt) ) - output_messages = [{"content": ""}] + output_messages: List[Message] = [Message(content="")] if not span.error and response is not None: if ctx["resource"] == "Converse": output_messages = self._extract_output_message_for_converse(response) @@ -191,7 +192,7 @@ def translate_bedrock_traces(self, traces, root_span) -> None: self._active_span_by_step_id.clear() @staticmethod - def _extract_input_message_for_converse(prompt: List[Dict[str, Any]]): + def _extract_input_message_for_converse(prompt: List[Dict[str, Any]]) -> List[Message]: """Extract input messages from the stored prompt for converse `prompt` is an array of `message` objects. Each `message` has a role and content field. @@ -203,8 +204,8 @@ def _extract_input_message_for_converse(prompt: List[Dict[str, Any]]): """ if not isinstance(prompt, list): log.warning("Bedrock input is not a list of messages or a string.") - return [{"content": ""}] - input_messages = [] + return [Message(content="")] + input_messages: List[Message] = [] for message in prompt: if not isinstance(message, dict): continue @@ -226,7 +227,7 @@ def _extract_output_message_for_converse(response: Dict[str, Any]): For more info, see bedrock converse response syntax: https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html#API_runtime_Converse_ResponseSyntax """ - default_content = [{"content": ""}] + default_content: List[Message] = [Message(content="")] message = response.get("output", {}).get("message", {}) if not message: return default_content @@ -241,7 +242,7 @@ def _converse_output_stream_processor() -> ( Generator[ None, Dict[str, Any], - Tuple[List[Dict[str, Any]], Dict[str, str], Dict[str, int]], + Tuple[List[Message], Dict[str, str], Dict[str, int]], ] ): """ @@ -259,7 +260,7 @@ def _converse_output_stream_processor() -> ( """ usage_metrics: Dict[str, int] = {} metadata: Dict[str, str] = {} - messages: List[Dict[str, Any]] = [] + messages: List[Message] = [] text_content_blocks: Dict[int, str] = {} tool_content_blocks: Dict[int, Dict[str, Any]] = {} @@ -336,47 +337,48 @@ def _converse_output_stream_processor() -> ( ) if not messages: - messages.append({"role": "assistant", "content": ""}) + messages.append(Message(content="", role="assistant")) normalize_input_tokens(usage_metrics) return messages, metadata, usage_metrics @staticmethod - def _extract_input_message(prompt): + def _extract_input_message(prompt) -> List[Message]: """Extract input messages from the stored prompt. Anthropic allows for messages and multiple texts in a message, which requires some special casing. """ if isinstance(prompt, str): - return [{"content": prompt}] + return [Message(content=prompt)] if not isinstance(prompt, list): log.warning("Bedrock input is not a list of messages or a string.") - return [{"content": ""}] - input_messages = [] + return [Message(content="")] + input_messages: List[Message] = [] for p in prompt: content = p.get("content", "") if isinstance(content, list) and isinstance(content[0], dict): for entry in content: if entry.get("type") == "text": - input_messages.append({"content": entry.get("text", ""), "role": str(p.get("role", ""))}) + input_messages.append(Message(content=entry.get("text", ""), role=str(p.get("role", "")))) elif entry.get("type") == "image": # Store a placeholder for potentially enormous binary image data. - input_messages.append({"content": "([IMAGE DETECTED])", "role": str(p.get("role", ""))}) + input_messages.append(Message(content="([IMAGE DETECTED])", role=str(p.get("role", "")))) else: - input_messages.append({"content": content, "role": str(p.get("role", ""))}) + input_messages.append(Message(content=str(content), role=str(p.get("role", "")))) return input_messages @staticmethod - def _extract_output_message(response): + def _extract_output_message(response) -> List[Message]: """Extract output messages from the stored response. Anthropic allows for chat messages, which requires some special casing. """ if isinstance(response["text"], str): - return [{"content": response["text"]}] + return [Message(content=response["text"])] if isinstance(response["text"], list): if isinstance(response["text"][0], str): - return [{"content": str(content)} for content in response["text"]] + return [Message(content=str(content)) for content in response["text"]] if isinstance(response["text"][0], dict): - return [{"content": response["text"][0].get("text", "")}] + return [Message(content=response["text"][0].get("text", ""))] + return [] def _get_base_url(self, **kwargs: Dict[str, Any]) -> Optional[str]: instance = kwargs.get("instance") @@ -396,8 +398,8 @@ def _extract_tool_definitions(self, tool_config: Dict[str, Any]) -> List[ToolDef for tool in tools: tool_spec = _get_attr(tool, "toolSpec", {}) tool_definition_info = ToolDefinition( - name=_get_attr(tool_spec, "name", ""), - description=_get_attr(tool_spec, "description", ""), + name=str(_get_attr(tool_spec, "name", "")), + description=str(_get_attr(tool_spec, "description", "")), schema=_get_attr(tool_spec, "inputSchema", {}), ) tool_definitions.append(tool_definition_info) diff --git a/ddtrace/llmobs/_integrations/bedrock_agents.py b/ddtrace/llmobs/_integrations/bedrock_agents.py index cec0b4b0eaa..9f59b0a1082 100644 --- a/ddtrace/llmobs/_integrations/bedrock_agents.py +++ b/ddtrace/llmobs/_integrations/bedrock_agents.py @@ -3,12 +3,12 @@ import sys from typing import Any from typing import Dict +from typing import List +from typing import Literal from typing import Optional from typing import Tuple from ddtrace._trace.span import Span -from ddtrace.constants import ERROR_MSG -from ddtrace.constants import ERROR_TYPE from ddtrace.internal._rand import rand128bits from ddtrace.internal.logger import get_logger from ddtrace.internal.utils.formats import format_trace_id @@ -17,6 +17,12 @@ from ddtrace.llmobs._utils import _get_ml_app from ddtrace.llmobs._utils import _get_session_id from ddtrace.llmobs._utils import safe_json +from ddtrace.llmobs._writer import LLMObsSpanEvent +from ddtrace.llmobs.types import Message +from ddtrace.llmobs.types import _ErrorField +from ddtrace.llmobs.types import _Meta +from ddtrace.llmobs.types import _MetaIO +from ddtrace.llmobs.types import _SpanField log = get_logger(__name__) @@ -51,7 +57,7 @@ def _build_span_event( metadata=None, input_val=None, output_val=None, -): +) -> LLMObsSpanEvent: if span_id is None: span_id = rand128bits() apm_trace_id = format_trace_id(root_span.trace_id) @@ -61,7 +67,7 @@ def _build_span_event( session_id = _get_session_id(root_span) ml_app = _get_ml_app(root_span) tags = [f"ml_app:{ml_app}", f"session_id:{session_id}", "integration:bedrock_agents"] - span_event = { + span_event: LLMObsSpanEvent = { "name": span_name, "span_id": str(span_id), "trace_id": format_trace_id(llmobs_trace_id), @@ -70,12 +76,13 @@ def _build_span_event( "start_ns": int(start_ns or root_span.start_ns), "duration": int(duration_ns or DEFAULT_SPAN_DURATION), "status": "error" if error else "ok", - "meta": { - "span.kind": str(span_kind), - "metadata": {}, - "input": {}, - "output": {}, - }, + "meta": _Meta( + span=_SpanField(kind=span_kind), + metadata={}, + input=_MetaIO(), + output=_MetaIO(), + error=_ErrorField(), + ), "metrics": {}, "_dd": { "span_id": str(span_id), @@ -85,15 +92,15 @@ def _build_span_event( } if metadata is not None: span_event["meta"]["metadata"] = metadata - io_key = "messages" if span_kind == "llm" else "value" - if input_val is not None: + io_key: Literal["messages", "value"] = "messages" if span_kind == "llm" else "value" + if input_val is not None and "input" in span_event["meta"]: span_event["meta"]["input"][io_key] = input_val - if output_val is not None: + if output_val is not None and "output" in span_event["meta"]: span_event["meta"]["output"][io_key] = output_val - if error_msg is not None: - span_event["meta"][ERROR_MSG] = error_msg - if error_type is not None: - span_event["meta"][ERROR_TYPE] = error_type + if error_msg is not None and "error" in span_event["meta"]: + span_event["meta"]["error"]["message"] = error_msg + if error_type is not None and "error" in span_event["meta"]: + span_event["meta"]["error"]["type"] = error_type return span_event @@ -176,7 +183,7 @@ def _create_or_update_bedrock_trace_step_span(trace, trace_step_id, inner_span_e def _translate_custom_orchestration_trace( trace: Dict[str, Any], root_span: Span, current_active_span: Optional[Dict[str, Any]], trace_step_id: str -) -> Tuple[Optional[Dict[str, Any]], bool]: +) -> Tuple[Optional[LLMObsSpanEvent], bool]: """Translates a custom orchestration bedrock trace into a LLMObs span event. Returns the translated span event and a boolean indicating if the trace is finished. """ @@ -197,8 +204,8 @@ def _translate_custom_orchestration_trace( def _translate_orchestration_trace( - trace: Dict[str, Any], root_span: Span, current_active_span: Optional[Dict[str, Any]], trace_step_id: str -) -> Tuple[Optional[Dict[str, Any]], bool]: + trace: Dict[str, Any], root_span: Span, current_active_span: Optional[LLMObsSpanEvent], trace_step_id: str +) -> Tuple[Optional[LLMObsSpanEvent], bool]: """Translates an orchestration bedrock trace into a LLMObs span event. Returns the translated span event and a boolean indicating if the trace is finished. """ @@ -224,7 +231,7 @@ def _translate_orchestration_trace( def _translate_failure_trace( trace: Dict[str, Any], root_span: Span, current_active_span: Optional[Dict[str, Any]], trace_step_id: str -) -> Tuple[Optional[Dict[str, Any]], bool]: +) -> Tuple[Optional[LLMObsSpanEvent], bool]: """Translates a failure bedrock trace into a LLMObs span event. Returns the translated span event and a boolean indicating that the span is finished. """ @@ -253,7 +260,7 @@ def _translate_failure_trace( def _translate_guardrail_trace( trace: Dict[str, Any], root_span: Span, current_active_span: Optional[Dict[str, Any]], trace_step_id: str -) -> Tuple[Optional[Dict[str, Any]], bool]: +) -> Tuple[Optional[LLMObsSpanEvent], bool]: """Translates a guardrail bedrock trace into a LLMObs span event. Returns the translated span event and a boolean indicating that the span is finished. """ @@ -288,8 +295,8 @@ def _translate_guardrail_trace( def _translate_post_processing_trace( - trace: Dict[str, Any], root_span: Span, current_active_span: Optional[Dict[str, Any]], trace_step_id: str -) -> Tuple[Optional[Dict[str, Any]], bool]: + trace: Dict[str, Any], root_span: Span, current_active_span: Optional[LLMObsSpanEvent], trace_step_id: str +) -> Tuple[Optional[LLMObsSpanEvent], bool]: """Translates a postprocessing bedrock trace into a LLMObs span event. Returns the translated span event and a boolean indicating if the span is finished. """ @@ -305,8 +312,8 @@ def _translate_post_processing_trace( def _translate_pre_processing_trace( - trace: Dict[str, Any], root_span: Span, current_active_span: Optional[Dict[str, Any]], trace_step_id: str -) -> Tuple[Optional[Dict[str, Any]], bool]: + trace: Dict[str, Any], root_span: Span, current_active_span: Optional[LLMObsSpanEvent], trace_step_id: str +) -> Tuple[Optional[LLMObsSpanEvent], bool]: """Translates a preprocessing bedrock trace into a LLMObs span event. Returns the translated span event and a boolean indicating if the span is finished. """ @@ -322,8 +329,8 @@ def _translate_pre_processing_trace( def _translate_routing_classifier_trace( - trace: Dict[str, Any], root_span: Span, current_active_span: Optional[Dict[str, Any]], trace_step_id: str -) -> Tuple[Optional[Dict[str, Any]], bool]: + trace: Dict[str, Any], root_span: Span, current_active_span: Optional[LLMObsSpanEvent], trace_step_id: str +) -> Tuple[Optional[LLMObsSpanEvent], bool]: """Translates a routing classifier bedrock trace into a LLMObs span event. Returns the translated span event and a boolean indicating if the span is finished. """ @@ -346,7 +353,7 @@ def _translate_routing_classifier_trace( def _model_invocation_input_span( model_input: Dict[str, Any], trace_step_id: str, start_ns: int, root_span: Span -) -> Optional[Dict[str, Any]]: +) -> Optional[LLMObsSpanEvent]: """Translates a Bedrock model invocation input trace into a LLMObs span event.""" model_id = model_input.get("foundationModel", "") model_provider, model_name = parse_model_id(model_id) @@ -355,9 +362,9 @@ def _model_invocation_input_span( except (json.JSONDecodeError, UnicodeDecodeError): log.warning("Failed to decode model input text.") text = {} - input_messages = [{"content": text.get("system", ""), "role": "system"}] + input_messages: List[Message] = [Message(content=text.get("system", ""), role="system")] for message in text.get("messages", []): - input_messages.append({"content": message.get("content", ""), "role": message.get("role", "")}) + input_messages.append(Message(content=message.get("content", ""), role=message.get("role", ""))) span_event = _build_span_event( "modelInvocation", root_span, @@ -371,39 +378,40 @@ def _model_invocation_input_span( def _model_invocation_output_span( - model_output: Dict[str, Any], current_active_span: Optional[Dict[str, Any]], root_span: Span -) -> Optional[Dict[str, Any]]: + model_output: Dict[str, Any], current_active_span: Optional[LLMObsSpanEvent], root_span: Span +) -> Optional[LLMObsSpanEvent]: """Translates a Bedrock model invocation output trace into a LLMObs span event.""" if not current_active_span: log.warning("Error in processing modelInvocationOutput.") return None bedrock_metadata = model_output.get("metadata", {}) start_ns, duration_ns = _extract_start_and_duration_from_metadata(bedrock_metadata, root_span) - output_messages = [] + output_messages: List[Message] = [] parsed_response = model_output.get("parsedResponse", {}) if parsed_response: - output_messages.append({"content": safe_json(parsed_response), "role": "assistant"}) + output_messages.append(Message(content=safe_json(parsed_response) or "", role="assistant")) else: raw_response = model_output.get("rawResponse", {}).get("content", "") - output_messages.append({"content": raw_response, "role": "assistant"}) + output_messages.append(Message(content=raw_response, role="assistant")) reasoning_text = model_output.get("reasoningContent", {}).get("reasoningText", {}) - if reasoning_text: - current_active_span["metadata"]["reasoningText"] = str(reasoning_text.get("text", "")) + if reasoning_text and "metadata" in current_active_span["meta"]: + current_active_span["meta"]["metadata"]["reasoningText"] = str(reasoning_text.get("text", "")) token_metrics = { "input_tokens": bedrock_metadata.get("usage", {}).get("inputTokens", 0), "output_tokens": bedrock_metadata.get("usage", {}).get("outputTokens", 0), } current_active_span["start_ns"] = int(start_ns) current_active_span["duration"] = int(duration_ns) - current_active_span["meta"]["output"]["messages"] = output_messages + if "output" in current_active_span["meta"]: + current_active_span["meta"]["output"]["messages"] = output_messages current_active_span["metrics"] = token_metrics return current_active_span def _rationale_span( rationale: Dict[str, Any], trace_step_id: str, start_ns: int, root_span: Span -) -> Optional[Dict[str, Any]]: +) -> Optional[LLMObsSpanEvent]: """Translates a Bedrock rationale trace into a LLMObs span event.""" span_event = _build_span_event( "reasoning", root_span, trace_step_id, "task", start_ns=start_ns, output_val=rationale.get("text", "") @@ -413,7 +421,7 @@ def _rationale_span( def _invocation_input_span( invocation_input: Dict[str, Any], trace_step_id: str, start_ns: int, root_span: Span -) -> Optional[Dict[str, Any]]: +) -> Optional[LLMObsSpanEvent]: """Translates a Bedrock invocation input trace into a LLMObs span event.""" span_name = "" tool_metadata = {} @@ -447,8 +455,8 @@ def _invocation_input_span( def _observation_span( - observation: Dict[str, Any], root_span: Span, current_active_span: Optional[Dict[str, Any]] -) -> Optional[Dict[str, Any]]: + observation: Dict[str, Any], root_span: Span, current_active_span: Optional[LLMObsSpanEvent] +) -> Optional[LLMObsSpanEvent]: """Translates a Bedrock observation trace into a LLMObs span event.""" observation_type = observation.get("type", "") if observation_type in ("FINISH", "REPROMPT"): @@ -479,7 +487,8 @@ def _observation_span( start_ns, duration_ns = _extract_start_and_duration_from_metadata(bedrock_metadata, root_span) current_active_span["start_ns"] = int(start_ns) current_active_span["duration"] = int(duration_ns) - current_active_span["meta"]["output"]["value"] = output_value + if "output" in current_active_span["meta"]: + current_active_span["meta"]["output"]["value"] = output_value return current_active_span diff --git a/ddtrace/llmobs/_integrations/crewai.py b/ddtrace/llmobs/_integrations/crewai.py index e5f104132df..de34ffce85d 100644 --- a/ddtrace/llmobs/_integrations/crewai.py +++ b/ddtrace/llmobs/_integrations/crewai.py @@ -24,6 +24,7 @@ from ddtrace.llmobs._integrations.base import BaseLLMIntegration from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor from ddtrace.llmobs._utils import safe_json +from ddtrace.llmobs.types import _SpanLink from ddtrace.trace import Span @@ -123,11 +124,11 @@ def _llmobs_set_tags_crew(self, span, args, kwargs, response): task_span_ids = self._crews_to_task_span_ids.get(crew_id, []) if task_span_ids: last_task_span_id = task_span_ids[-1] - span_link = { - "span_id": last_task_span_id, - "trace_id": format_trace_id(span.trace_id), - "attributes": {"from": "output", "to": "output"}, - } + span_link = _SpanLink( + span_id=last_task_span_id, + trace_id=format_trace_id(span.trace_id), + attributes={"from": "output", "to": "output"}, + ) curr_span_links = span._get_ctx_item(SPAN_LINKS) or [] span._set_ctx_item(SPAN_LINKS, curr_span_links + [span_link]) metadata = { @@ -160,11 +161,11 @@ def _llmobs_set_tags_task(self, span, args, kwargs, response): span_links = self._crews_to_tasks[crew_id].get(str(task_id), {}).get("span_links", []) if self._is_planning_task(span): parent_span = _get_nearest_llmobs_ancestor(span) - span_link = { - "span_id": str(parent_span.span_id), - "trace_id": format_trace_id(span.trace_id), - "attributes": {"from": "input", "to": "input"}, - } + span_link = _SpanLink( + span_id=str(parent_span.span_id), + trace_id=format_trace_id(span.trace_id), + attributes={"from": "input", "to": "input"}, + ) span_links.append(span_link) curr_span_links = span._get_ctx_item(SPAN_LINKS) or [] span._set_ctx_item(SPAN_LINKS, curr_span_links + span_links) @@ -186,18 +187,18 @@ def _llmobs_set_tags_agent(self, span, args, kwargs, response): context = get_argument_value(args, kwargs, 1, "context", optional=True) or "" parent_span = _get_nearest_llmobs_ancestor(span) - parent_span_link = { - "span_id": str(span.span_id), - "trace_id": format_trace_id(span.trace_id), - "attributes": {"from": "output", "to": "output"}, - } + parent_span_link = _SpanLink( + span_id=str(span.span_id), + trace_id=format_trace_id(span.trace_id), + attributes={"from": "output", "to": "output"}, + ) curr_span_links = parent_span._get_ctx_item(SPAN_LINKS) or [] parent_span._set_ctx_item(SPAN_LINKS, curr_span_links + [parent_span_link]) - span_link = { - "span_id": str(parent_span.span_id), - "trace_id": format_trace_id(span.trace_id), - "attributes": {"from": "input", "to": "input"}, - } + span_link = _SpanLink( + span_id=str(parent_span.span_id), + trace_id=format_trace_id(span.trace_id), + attributes={"from": "input", "to": "input"}, + ) curr_span_links = span._get_ctx_item(SPAN_LINKS) or [] span._set_ctx_items( { @@ -306,11 +307,11 @@ def _llmobs_set_tags_flow_method(self, span, args, kwargs, response): ) if span.name in getattr(flow_instance, "_start_methods", []) and span._parent is not None: span_links.append( - { - "span_id": str(span._parent.span_id), - "trace_id": format_trace_id(span.trace_id), - "attributes": {"from": "input", "to": "input"}, - } + _SpanLink( + span_id=str(span._parent.span_id), + trace_id=format_trace_id(span.trace_id), + attributes={"from": "input", "to": "input"}, + ) ) if span.name in getattr(flow_instance, "_routers", []): @@ -368,11 +369,11 @@ def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): if condition_type != "AND": triggered = True span_links.append( - { - "span_id": str(trigger_span_dict["span_id"]), - "trace_id": format_trace_id(flow_span.trace_id), - "attributes": {"from": "output", "to": "input"}, - } + _SpanLink( + span_id=str(trigger_span_dict["span_id"]), + trace_id=format_trace_id(flow_span.trace_id), + attributes={"from": "output", "to": "input"}, + ) ) continue if any( @@ -384,11 +385,11 @@ def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): for method in listener_triggers: method_span_dict = flow_methods_to_spans.get(method, {}) span_links.append( - { - "span_id": str(method_span_dict["span_id"]), - "trace_id": format_trace_id(flow_span.trace_id), - "attributes": {"from": "output", "to": "input"}, - } + _SpanLink( + span_id=str(method_span_dict["span_id"]), + trace_id=format_trace_id(flow_span.trace_id), + attributes={"from": "output", "to": "input"}, + ) ) flow_span_span_links = flow_span._get_ctx_item(SPAN_LINKS) or [] # Remove temporary output->output link since the AND has been triggered @@ -400,11 +401,11 @@ def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): if triggered is False: flow_span_span_links = flow_span._get_ctx_item(SPAN_LINKS) or [] flow_span_span_links.append( - { - "span_id": str(trigger_span_dict["span_id"]), - "trace_id": format_trace_id(flow_span.trace_id), - "attributes": {"from": "output", "to": "output"}, - } + _SpanLink( + span_id=str(trigger_span_dict["span_id"]), + trace_id=format_trace_id(flow_span.trace_id), + attributes={"from": "output", "to": "output"}, + ) ) flow_span._set_ctx_item(SPAN_LINKS, flow_span_span_links) return @@ -434,7 +435,7 @@ def _llmobs_set_span_link_on_task(self, span, args, kwargs): crew_id = _get_crew_id(span, "crew") is_planning_crew_instance = crew_id in self._planning_crew_ids queued_task_node = self._crews_to_tasks.get(crew_id, {}).setdefault(str(queued_task_id), {}) - span_links = [] + span_links: List[_SpanLink] = [] if isinstance(getattr(queued_task, "context", None), Iterable): for finished_task in queued_task.context: @@ -442,31 +443,31 @@ def _llmobs_set_span_link_on_task(self, span, args, kwargs): finished_task_node = self._crews_to_tasks.get(crew_id, {}).get(str(finished_task_id), {}) finished_task_span_id = finished_task_node.get("span_id") span_links.append( - { - "span_id": finished_task_span_id, - "trace_id": format_trace_id(span.trace_id), - "attributes": {"from": "output", "to": "input"}, - } + _SpanLink( + span_id=finished_task_span_id, + trace_id=format_trace_id(span.trace_id), + attributes={"from": "output", "to": "input"}, + ) ) queued_task_node["span_links"] = span_links return if not finished_task_outputs and not is_planning_crew_instance: queued_task_node["span_links"] = [ - { - "span_id": str(span.span_id) if span else ROOT_PARENT_ID, - "trace_id": format_trace_id(span.trace_id), - "attributes": {"from": "input", "to": "input"}, - } + _SpanLink( + span_id=str(span.span_id) if span else ROOT_PARENT_ID, + trace_id=format_trace_id(span.trace_id), + attributes={"from": "input", "to": "input"}, + ) ] return if is_planning_crew_instance and self._crews_to_task_span_ids.get(crew_id, []): planning_task_span_id = self._crews_to_task_span_ids[crew_id][-1] queued_task_node["span_links"] = [ - { - "span_id": planning_task_span_id if span else ROOT_PARENT_ID, - "trace_id": format_trace_id(span.trace_id), - "attributes": {"from": "output", "to": "input"}, - } + _SpanLink( + span_id=planning_task_span_id if span else ROOT_PARENT_ID, + trace_id=format_trace_id(span.trace_id), + attributes={"from": "output", "to": "input"}, + ) ] return @@ -475,11 +476,11 @@ def _llmobs_set_span_link_on_task(self, span, args, kwargs): for i in range(1, num_tasks_to_link + 1): # Iterate backwards through last n finished tasks finished_task_span_id = finished_task_spans[-i] span_links.append( - { - "span_id": finished_task_span_id, - "trace_id": format_trace_id(span.trace_id), - "attributes": {"from": "output", "to": "input"}, - } + _SpanLink( + span_id=finished_task_span_id, + trace_id=format_trace_id(span.trace_id), + attributes={"from": "output", "to": "input"}, + ) ) queued_task_node["span_links"] = span_links return diff --git a/ddtrace/llmobs/_integrations/gemini.py b/ddtrace/llmobs/_integrations/gemini.py index ef230f85b95..664f9b7dfb9 100644 --- a/ddtrace/llmobs/_integrations/gemini.py +++ b/ddtrace/llmobs/_integrations/gemini.py @@ -20,6 +20,7 @@ from ddtrace.llmobs._integrations.google_utils import get_system_instructions_gemini_vertexai from ddtrace.llmobs._integrations.google_utils import llmobs_get_metadata_gemini_vertexai from ddtrace.llmobs._utils import _get_attr +from ddtrace.llmobs.types import Message from ddtrace.trace import Span @@ -47,9 +48,9 @@ def _llmobs_set_tags( system_instruction = get_system_instructions_gemini_vertexai(instance) input_contents = get_argument_value(args, kwargs, 0, "contents") - input_messages = self._extract_input_message(input_contents, system_instruction) + input_messages: List[Message] = self._extract_input_message(input_contents, system_instruction) - output_messages = [{"content": ""}] + output_messages: List[Message] = [Message(content="")] if response is not None: output_messages = self._extract_output_message(response) @@ -66,30 +67,30 @@ def _llmobs_set_tags( ) def _extract_input_message(self, contents, system_instruction=None): - messages = [] + messages: List[Message] = [] if system_instruction: for instruction in system_instruction: - messages.append({"content": instruction or "", "role": "system"}) + messages.append(Message(content=instruction or "", role="system")) if isinstance(contents, str): - messages.append({"content": contents}) + messages.append(Message(content=contents)) return messages if isinstance(contents, dict): - message = {"content": contents.get("text", "")} + message = Message(content=contents.get("text", "")) if contents.get("role", None): message["role"] = contents["role"] messages.append(message) return messages if not isinstance(contents, list): - messages.append({"content": "[Non-text content object: {}]".format(repr(contents))}) + messages.append(Message(content="[Non-text content object: {}]".format(repr(contents)))) return messages for content in contents: if isinstance(content, str): - messages.append({"content": content}) + messages.append(Message(content=content)) continue role = _get_attr(content, "role", None) parts = _get_attr(content, "parts", []) if not parts or not isinstance(parts, Iterable): - message = {"content": "[Non-text content object: {}]".format(repr(content))} + message = Message(content="[Non-text content object: {}]".format(repr(content))) if role: message["role"] = role messages.append(message) diff --git a/ddtrace/llmobs/_integrations/google_genai.py b/ddtrace/llmobs/_integrations/google_genai.py index 8c89d282778..5c799d16a39 100644 --- a/ddtrace/llmobs/_integrations/google_genai.py +++ b/ddtrace/llmobs/_integrations/google_genai.py @@ -22,8 +22,9 @@ from ddtrace.llmobs._integrations.google_utils import extract_provider_and_model_name from ddtrace.llmobs._integrations.google_utils import normalize_contents_google_genai from ddtrace.llmobs._utils import _get_attr -from ddtrace.llmobs.utils import Document -from ddtrace.llmobs.utils import ToolDefinition +from ddtrace.llmobs.types import Document +from ddtrace.llmobs.types import Message +from ddtrace.llmobs.types import ToolDefinition # https://cloud.google.com/vertex-ai/generative-ai/docs/multimodal/content-generation-parameters @@ -110,8 +111,8 @@ def _llmobs_set_tags_from_embedding(self, span, args, kwargs, response): } ) - def _extract_input_messages(self, args: List[Any], kwargs: Dict[str, Any], config) -> List[Dict[str, Any]]: - messages = [] + def _extract_input_messages(self, args: List[Any], kwargs: Dict[str, Any], config) -> List[Message]: + messages: List[Message] = [] system_instruction = _get_attr(config, "system_instruction", None) if system_instruction is not None: @@ -122,17 +123,17 @@ def _extract_input_messages(self, args: List[Any], kwargs: Dict[str, Any], confi return messages - def _extract_messages_from_contents(self, contents, default_role: str) -> List[Dict[str, Any]]: - messages = [] + def _extract_messages_from_contents(self, contents, default_role: str) -> List[Message]: + messages: List[Message] = [] for content in normalize_contents_google_genai(contents): role = content.get("role") or default_role for part in content.get("parts", []): messages.append(extract_message_from_part_google_genai(part, role)) return messages - def _extract_output_messages(self, response) -> List[Dict[str, Any]]: + def _extract_output_messages(self, response) -> List[Message]: if not response: - return [{"content": "", "role": GOOGLE_GENAI_DEFAULT_MODEL_ROLE}] + return [Message(content="", role=GOOGLE_GENAI_DEFAULT_MODEL_ROLE)] messages = [] candidates = _get_attr(response, "candidates", []) for candidate in candidates: @@ -156,7 +157,7 @@ def _extract_embedding_output_value(self, response) -> str: def _extract_embedding_input_documents(self, args, kwargs, config) -> List[Document]: contents = kwargs.get("contents") messages = self._extract_messages_from_contents(contents, "user") - documents = [Document(text=str(message["content"])) for message in messages] + documents = [Document(text=str(message.get("content", ""))) for message in messages] return documents def _extract_metadata(self, config, params) -> Dict[str, Any]: @@ -175,8 +176,8 @@ def _function_declaration_to_tool_definition(self, function_declaration) -> Tool schema = {"value": repr(schema)} return ToolDefinition( - name=_get_attr(function_declaration, "name", "") or "", - description=_get_attr(function_declaration, "description", "") or "", + name=str(_get_attr(function_declaration, "name", "") or ""), + description=str(_get_attr(function_declaration, "description", "") or ""), schema=schema, ) diff --git a/ddtrace/llmobs/_integrations/google_utils.py b/ddtrace/llmobs/_integrations/google_utils.py index 7155e07745a..76c12daeac8 100644 --- a/ddtrace/llmobs/_integrations/google_utils.py +++ b/ddtrace/llmobs/_integrations/google_utils.py @@ -12,8 +12,9 @@ from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY from ddtrace.llmobs._utils import _get_attr from ddtrace.llmobs._utils import safe_json -from ddtrace.llmobs.utils import ToolCall -from ddtrace.llmobs.utils import ToolResult +from ddtrace.llmobs.types import Message +from ddtrace.llmobs.types import ToolCall +from ddtrace.llmobs.types import ToolResult # Google GenAI has roles "model" and "user", but in order to stay consistent with other integrations, @@ -174,7 +175,7 @@ def extract_embedding_metrics_google_genai(response) -> Dict[str, Any]: return usage -def extract_message_from_part_google_genai(part, role: str) -> Dict[str, Any]: +def extract_message_from_part_google_genai(part, role: str) -> Message: """part is a PartUnion = Union[File, Part, PIL_Image, str] returns a dict representing a message with format {"role": role, "content": content} @@ -182,7 +183,7 @@ def extract_message_from_part_google_genai(part, role: str) -> Dict[str, Any]: if role == "model": role = GOOGLE_GENAI_DEFAULT_MODEL_ROLE - message: Dict[str, Any] = {"role": role} + message: Message = Message(role=role) if isinstance(part, str): message["content"] = part return message @@ -223,17 +224,17 @@ def extract_message_from_part_google_genai(part, role: str) -> Dict[str, Any]: if executable_code: language = _get_attr(executable_code, "language", "UNKNOWN") code = _get_attr(executable_code, "code", "") - message["content"] = safe_json({"language": str(language), "code": str(code)}) + message["content"] = safe_json({"language": str(language), "code": str(code)}) or "" return message code_execution_result = _get_attr(part, "code_execution_result", None) if code_execution_result: outcome = _get_attr(code_execution_result, "outcome", "OUTCOME_UNSPECIFIED") output = _get_attr(code_execution_result, "output", "") - message["content"] = safe_json({"outcome": str(outcome), "output": str(output)}) + message["content"] = safe_json({"outcome": str(outcome), "output": str(output)}) or "" return message - return {"content": "Unsupported file type: {}".format(type(part)), "role": role} + return Message(content="Unsupported file type: {}".format(type(part)), role=role) def llmobs_get_metadata_gemini_vertexai(kwargs, instance): @@ -252,11 +253,11 @@ def llmobs_get_metadata_gemini_vertexai(kwargs, instance): return metadata -def extract_message_from_part_gemini_vertexai(part, role=None): +def extract_message_from_part_gemini_vertexai(part, role=None) -> Message: text = _get_attr(part, "text", "") function_call = _get_attr(part, "function_call", None) function_response = _get_attr(part, "function_response", None) - message = {"content": text} + message = Message(content=str(text)) if role: message["role"] = role if function_call: @@ -323,7 +324,7 @@ def get_system_instructions_gemini_vertexai(model_instance): return system_instructions -def extract_messages_from_adk_events(events) -> List[Dict[str, Any]]: +def extract_messages_from_adk_events(events) -> List[Message]: """ Extract messages from Google ADK Event objects. diff --git a/ddtrace/llmobs/_integrations/langchain.py b/ddtrace/llmobs/_integrations/langchain.py index 27dc53f89a9..1ccfb665950 100644 --- a/ddtrace/llmobs/_integrations/langchain.py +++ b/ddtrace/llmobs/_integrations/langchain.py @@ -42,7 +42,10 @@ from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor from ddtrace.llmobs._utils import _validate_prompt from ddtrace.llmobs._utils import safe_json -from ddtrace.llmobs.utils import Document +from ddtrace.llmobs.types import Document +from ddtrace.llmobs.types import Message +from ddtrace.llmobs.types import ToolCall +from ddtrace.llmobs.types import _SpanLink from ddtrace.trace import Span @@ -298,7 +301,7 @@ def _set_output_links(self, span: Span, parent_span: Span, invoker_spans: List[S This is done by removing span links of previous steps in the chain from the parent span (if it is a chain). We add output->output span links at every step. """ - parent_links = parent_span._get_ctx_item(SPAN_LINKS) or [] + parent_links: List[_SpanLink] = parent_span._get_ctx_item(SPAN_LINKS) or [] pop_indices = self._get_popped_span_link_indices(parent_span, parent_links, invoker_spans, from_output) self._set_span_links( @@ -310,7 +313,7 @@ def _set_output_links(self, span: Span, parent_span: Span, invoker_spans: List[S ) def _get_popped_span_link_indices( - self, parent_span: Span, parent_links: List[Dict[str, Any]], invoker_spans: List[Span], from_output: bool + self, parent_span: Span, parent_links: List[_SpanLink], invoker_spans: List[Span], from_output: bool ) -> List[int]: """ Returns a list of indices to pop from the parent span links list @@ -340,17 +343,17 @@ def _set_span_links( popped_span_link_indices: Optional[List[int]] = None, ) -> None: """Sets the span links on the given span along with the existing links.""" - existing_links = span._get_ctx_item(SPAN_LINKS) or [] + existing_links: List[_SpanLink] = span._get_ctx_item(SPAN_LINKS) or [] if popped_span_link_indices: existing_links = [link for i, link in enumerate(existing_links) if i not in popped_span_link_indices] - links = [ - { - "trace_id": format_trace_id(from_span.trace_id), - "span_id": str(from_span.span_id), - "attributes": {"from": link_from, "to": link_to}, - } + links: List[_SpanLink] = [ + _SpanLink( + trace_id=format_trace_id(from_span.trace_id), + span_id=str(from_span.span_id), + attributes={"from": link_from, "to": link_to}, + ) for from_span in from_spans if from_span is not None ] @@ -440,7 +443,7 @@ def _llmobs_set_tags_from_llm( # chat and llm take the same input types for streamed calls input_messages = self._handle_stream_input_messages(prompts) else: - input_messages = [{"content": str(prompt)} for prompt in prompts] + input_messages = [Message(content=str(prompt)) for prompt in prompts] span._set_ctx_items( { @@ -454,12 +457,12 @@ def _llmobs_set_tags_from_llm( self._llmobs_set_metadata(span, kwargs) if span.error: - span._set_ctx_item(output_tag_key, [{"content": ""}]) + span._set_ctx_item(output_tag_key, [Message(content="")]) return if stream: - message_content = [{"content": completions}] # single completion for streams + message_content = [Message(content=completions)] # single completion for streams else: - message_content = [{"content": completion[0].text} for completion in completions.generations] + message_content = [Message(content=completion[0].text) for completion in completions.generations] if not is_workflow: input_tokens, output_tokens, total_tokens = self.check_token_usage_chat_or_llm_result(completions) if total_tokens > 0: @@ -493,7 +496,7 @@ def _llmobs_set_tags_from_chat_model( output_tag_key = OUTPUT_VALUE if is_workflow else OUTPUT_MESSAGES stream = span.get_tag("langchain.request.stream") - input_messages = [] + input_messages: List[Message] = [] if stream: chat_messages = get_argument_value(args, kwargs, 0, "input") input_messages = self._handle_stream_input_messages(chat_messages) @@ -507,7 +510,7 @@ def _llmobs_set_tags_from_chat_model( message.get("content", "") if isinstance(message, dict) else getattr(message, "content", "") ) role = getattr(message, "role", ROLE_MAPPING.get(getattr(message, "type", ""), "")) - input_messages.append({"content": str(content), "role": str(role)}) + input_messages.append(Message(content=str(content), role=str(role))) tool_call_id = _get_attr(message, "tool_call_id", "") if not is_workflow and tool_call_id: core.dispatch( @@ -520,14 +523,14 @@ def _llmobs_set_tags_from_chat_model( span._set_ctx_item(input_tag_key, input_messages) if span.error: - span._set_ctx_item(output_tag_key, [{"content": ""}]) + span._set_ctx_item(output_tag_key, [Message(content="")]) return - output_messages = [] + output_messages: List[Message] = [] if stream: content = chat_completions.content role = chat_completions.__class__.__name__.replace("MessageChunk", "").lower() # AIMessageChunk --> ai - span._set_ctx_item(output_tag_key, [{"content": content, "role": ROLE_MAPPING.get(role, "")}]) + span._set_ctx_item(output_tag_key, [Message(content=content, role=ROLE_MAPPING.get(role, ""))]) return input_tokens, output_tokens, total_tokens = 0, 0, 0 @@ -542,7 +545,7 @@ def _llmobs_set_tags_from_chat_model( for chat_completion in message_set: chat_completion_msg = chat_completion.message role = getattr(chat_completion_msg, "role", ROLE_MAPPING.get(chat_completion_msg.type, "")) - output_message = {"content": str(chat_completion.text), "role": role} + output_message = Message(content=str(chat_completion.text), role=role) tool_calls_info = self._extract_tool_calls(chat_completion_msg) if not is_workflow: for tool_call in tool_calls_info: @@ -589,30 +592,30 @@ def _llmobs_set_tags_from_chat_model( } span._set_ctx_item(METRICS, metrics) - def _extract_tool_calls(self, chat_completion_msg: Any) -> List[Dict[str, Any]]: + def _extract_tool_calls(self, chat_completion_msg: Any) -> List[ToolCall]: """Extracts tool calls from a langchain chat completion.""" tool_calls = getattr(chat_completion_msg, "tool_calls", None) - tool_calls_info = [] + tool_calls_info: List[ToolCall] = [] if tool_calls: if not isinstance(tool_calls, list): tool_calls = [tool_calls] for tool_call in tool_calls: - tool_call_info = { - "name": tool_call.get("name", ""), - "arguments": tool_call.get("args", {}), # this is already a dict - "tool_id": tool_call.get("id", ""), - } + tool_call_info = ToolCall( + name=tool_call.get("name", ""), + arguments=tool_call.get("args", {}), # this is already a dict + tool_id=tool_call.get("id", ""), + ) tool_calls_info.append(tool_call_info) return tool_calls_info - def _handle_stream_input_messages(self, inputs): - input_messages = [] + def _handle_stream_input_messages(self, inputs) -> List[Message]: + input_messages: List[Message] = [] if hasattr(inputs, "to_messages"): # isinstance(inputs, langchain_core.prompt_values.PromptValue) inputs = inputs.to_messages() elif not isinstance(inputs, list): inputs = [inputs] for inp in inputs: - inp_message = {} + inp_message = Message() content, role = None, None if isinstance(inp, dict): content = str(inp.get("content", "")) @@ -676,7 +679,7 @@ def _llmobs_set_meta_tags_from_embedding( else: if isinstance(input_texts, str): input_texts = [input_texts] - input_documents = [Document(text=str(doc)) for doc in input_texts] + input_documents: List[Document] = [Document(text=str(doc)) for doc in input_texts] span._set_ctx_item(input_tag_key, input_documents) except TypeError: log.warning("Failed to serialize embedding input data to JSON") @@ -725,7 +728,7 @@ def _llmobs_set_meta_tags_from_similarity_search( if is_workflow: span._set_ctx_item(OUTPUT_VALUE, "[{} document(s) retrieved]".format(len(output_documents))) return - documents = [] + documents: List[Document] = [] for d in output_documents: doc = Document(text=d.page_content) doc["id"] = getattr(d, "id", "") diff --git a/ddtrace/llmobs/_integrations/langgraph.py b/ddtrace/llmobs/_integrations/langgraph.py index 6c12bd1ecf4..97c7b3a0a77 100644 --- a/ddtrace/llmobs/_integrations/langgraph.py +++ b/ddtrace/llmobs/_integrations/langgraph.py @@ -27,6 +27,7 @@ from ddtrace.llmobs._integrations.utils import format_langchain_io from ddtrace.llmobs._utils import _get_attr from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor +from ddtrace.llmobs.types import _SpanLink from ddtrace.trace import Span @@ -93,11 +94,11 @@ def _llmobs_set_tags( self._get_node_metadata_from_span(span, instance_id) if operation == "node" or is_subgraph else {} ) - span_links = [_default_span_link(span)] - invoked_node_span_links = invoked_node.get("span_links") - if invoked_node_span_links is not None: + span_links: List[_SpanLink] = [_default_span_link(span)] + invoked_node_span_links: List[_SpanLink] = invoked_node.get("span_links") or [] + if invoked_node_span_links: span_links = invoked_node_span_links - current_span_links = span._get_ctx_item(SPAN_LINKS) or [] + current_span_links: List[_SpanLink] = span._get_ctx_item(SPAN_LINKS) or [] def maybe_format_langchain_io(messages): if messages is None: @@ -230,23 +231,24 @@ def _handle_finished_graph(self, graph_span: Span, finished_tasks: dict[str, Any not whether it is a standalone graph (called internally during a node execution). """ graph_caller_span = _get_nearest_llmobs_ancestor(graph_span) if graph_span else None - output_span_links = [ - { - **self._graph_nodes_for_graph_by_task_id[graph_span][task_id]["span"], - "attributes": {"from": "output", "to": "output"}, - } + output_span_links: List[_SpanLink] = [ + _SpanLink( + span_id=self._graph_nodes_for_graph_by_task_id[graph_span][task_id]["span"]["span_id"], + trace_id=self._graph_nodes_for_graph_by_task_id[graph_span][task_id]["span"]["trace_id"], + attributes={"from": "output", "to": "output"}, + ) for task_id in finished_tasks.keys() ] - graph_span_span_links = graph_span._get_ctx_item(SPAN_LINKS) or [] + graph_span_span_links: List[_SpanLink] = graph_span._get_ctx_item(SPAN_LINKS) or [] graph_span._set_ctx_item(SPAN_LINKS, graph_span_span_links + output_span_links) if graph_caller_span is not None and not is_subgraph_node: - graph_caller_span_links = graph_caller_span._get_ctx_item(SPAN_LINKS) or [] - span_links = [ - { - "span_id": str(graph_span.span_id) or "undefined", - "trace_id": format_trace_id(graph_span.trace_id), - "attributes": {"from": "output", "to": "output"}, - } + graph_caller_span_links: List[_SpanLink] = graph_caller_span._get_ctx_item(SPAN_LINKS) or [] + span_links: List[_SpanLink] = [ + _SpanLink( + span_id=str(graph_span.span_id) or "undefined", + trace_id=format_trace_id(graph_span.trace_id), + attributes={"from": "output", "to": "output"}, + ) ] graph_caller_span._set_ctx_item(SPAN_LINKS, graph_caller_span_links + span_links) @@ -294,12 +296,12 @@ def _link_task_to_triggers( if not trigger_node_span: continue - span_link = { - "span_id": trigger_node_span.get("span_id", ""), - "trace_id": trigger_node_span.get("trace_id", ""), - "attributes": {"from": "output", "to": "input"}, - } - span_links: List[Dict[str, Any]] = queued_node.setdefault("span_links", []) + span_link = _SpanLink( + span_id=trigger_node_span.get("span_id", ""), + trace_id=trigger_node_span.get("trace_id", ""), + attributes={"from": "output", "to": "input"}, + ) + span_links: List[_SpanLink] = queued_node.setdefault("span_links", []) span_links.append(span_link) return trigger_ids @@ -311,7 +313,7 @@ def _link_standalone_terminal_tasks( Default handler that links any finished tasks not used as triggers for queued tasks to the outer graph span. """ standalone_terminal_task_ids = set(finished_tasks.keys()) - used_finished_tasks_ids - graph_span_links = graph_span._get_ctx_item(SPAN_LINKS) or [] + graph_span_links: List[_SpanLink] = graph_span._get_ctx_item(SPAN_LINKS) or [] for finished_task_id in standalone_terminal_task_ids: node = self._graph_nodes_for_graph_by_task_id.get(graph_span, {}).get(finished_task_id) if node is None: @@ -322,11 +324,11 @@ def _link_standalone_terminal_tasks( continue graph_span_links.append( - { - "span_id": span.get("span_id", ""), - "trace_id": span.get("trace_id", ""), - "attributes": {"from": "output", "to": "output"}, - } + _SpanLink( + span_id=span.get("span_id", ""), + trace_id=span.get("trace_id", ""), + attributes={"from": "output", "to": "output"}, + ) ) graph_span._set_ctx_item(SPAN_LINKS, graph_span_links) @@ -537,14 +539,14 @@ def _append_finished_task_to_channel_writes_map( tasks_for_trigger.append(finished_task_id) -def _default_span_link(span: Span) -> dict: +def _default_span_link(span: Span) -> _SpanLink: """ Create a default input-to-input span link for a given span, if there are no referenced spans that represent the causal link. In this case, we assume the span is linked to its parent's input. """ - return { - "span_id": span._get_ctx_item(PARENT_ID_KEY) or ROOT_PARENT_ID, - "trace_id": format_trace_id(span.trace_id), - "attributes": {"from": "input", "to": "input"}, - } + return _SpanLink( + span_id=span._get_ctx_item(PARENT_ID_KEY) or ROOT_PARENT_ID, + trace_id=format_trace_id(span.trace_id), + attributes={"from": "input", "to": "input"}, + ) diff --git a/ddtrace/llmobs/_integrations/openai.py b/ddtrace/llmobs/_integrations/openai.py index 35306d43ed6..75b769ccc00 100644 --- a/ddtrace/llmobs/_integrations/openai.py +++ b/ddtrace/llmobs/_integrations/openai.py @@ -27,7 +27,7 @@ from ddtrace.llmobs._integrations.utils import openai_set_meta_tags_from_response from ddtrace.llmobs._integrations.utils import update_proxy_workflow_input_output_value from ddtrace.llmobs._utils import _get_attr -from ddtrace.llmobs.utils import Document +from ddtrace.llmobs.types import Document from ddtrace.trace import Span @@ -132,7 +132,7 @@ def _llmobs_set_meta_tags_from_embedding(span: Span, kwargs: Dict[str, Any], res embedding_inputs = kwargs.get("input", "") if isinstance(embedding_inputs, str) or isinstance(embedding_inputs[0], int): embedding_inputs = [embedding_inputs] - input_documents = [] + input_documents: List[Document] = [] for doc in embedding_inputs: input_documents.append(Document(text=str(doc))) span._set_ctx_items({METADATA: metadata, INPUT_DOCUMENTS: input_documents}) diff --git a/ddtrace/llmobs/_integrations/openai_agents.py b/ddtrace/llmobs/_integrations/openai_agents.py index e535c784136..33e73ae98d8 100644 --- a/ddtrace/llmobs/_integrations/openai_agents.py +++ b/ddtrace/llmobs/_integrations/openai_agents.py @@ -237,9 +237,9 @@ def _llmobs_set_response_attributes(self, span: Span, oai_span: OaiSpanAdapter) core.dispatch( DISPATCH_ON_LLM_TOOL_CHOICE, ( - tool_call_output["tool_id"], - tool_call_output["name"], - safe_json(tool_call_output["arguments"]), + tool_call_output.get("tool_id", ""), + tool_call_output.get("name", ""), + safe_json(tool_call_output.get("arguments", {})), { "trace_id": format_trace_id(span.trace_id), "span_id": str(span.span_id), diff --git a/ddtrace/llmobs/_integrations/utils.py b/ddtrace/llmobs/_integrations/utils.py index 3a1542fe794..a1e658bd307 100644 --- a/ddtrace/llmobs/_integrations/utils.py +++ b/ddtrace/llmobs/_integrations/utils.py @@ -29,9 +29,10 @@ from ddtrace.llmobs._utils import load_data_value from ddtrace.llmobs._utils import safe_json from ddtrace.llmobs._utils import safe_load_json -from ddtrace.llmobs.utils import ToolCall -from ddtrace.llmobs.utils import ToolDefinition -from ddtrace.llmobs.utils import ToolResult +from ddtrace.llmobs.types import Message +from ddtrace.llmobs.types import ToolCall +from ddtrace.llmobs.types import ToolDefinition +from ddtrace.llmobs.types import ToolResult try: @@ -227,7 +228,7 @@ def get_content_from_langchain_message(message) -> Union[str, Tuple[str, str]]: return str(message) -def get_messages_from_converse_content(role: str, content: List[Dict[str, Any]]): +def get_messages_from_converse_content(role: str, content: List[Dict[str, Any]]) -> List[Message]: """ Extracts out a list of messages from a converse `content` field. @@ -239,13 +240,11 @@ def get_messages_from_converse_content(role: str, content: List[Dict[str, Any]]) """ if not content or not isinstance(content, list) or not isinstance(content[0], dict): return [] - messages: List[Dict[str, Union[str, List[Dict[str, Any]], List[ToolCall], List[ToolResult]]]] = [] + messages: List[Message] = [] content_blocks = [] tool_calls_info = [] - tool_messages: List[Dict[str, Any]] = [] - unsupported_content_messages: List[ - Dict[str, Union[str, List[Dict[str, Any]], List[ToolCall], List[ToolResult]]] - ] = [] + tool_messages: List[Message] = [] + unsupported_content_messages: List[Message] = [] for content_block in content: if content_block.get("text") and isinstance(content_block.get("text"), str): content_blocks.append(content_block.get("text", "")) @@ -275,21 +274,22 @@ def get_messages_from_converse_content(role: str, content: List[Dict[str, Any]]) type="toolResult", ) tool_messages.append( - { - "tool_results": [tool_result_info], - "role": "user", - } + Message( + tool_results=[tool_result_info], + role="user", + ) ) else: content_type = ",".join(content_block.keys()) unsupported_content_messages.append( - {"content": "[Unsupported content type: {}]".format(content_type), "role": role} + Message(content="[Unsupported content type: {}]".format(content_type), role=role) ) - message: Dict[str, Union[str, List[Dict[str, Any]], List[ToolCall], List[ToolResult]]] = {} + message: Message = Message() if tool_calls_info: - message.update({"tool_calls": tool_calls_info}) + message["tool_calls"] = tool_calls_info if content_blocks: - message.update({"content": " ".join(content_blocks), "role": role}) + message["content"] = " ".join(content_blocks) + message["role"] = role if message: messages.append(message) if unsupported_content_messages: @@ -307,13 +307,13 @@ def openai_set_meta_tags_from_completion( if isinstance(prompt, str): prompt = [prompt] parameters = get_metadata_from_kwargs(kwargs, integration_name, "completion") - output_messages = [{"content": ""}] + output_messages = [Message(content="")] if not span.error and completions: choices = getattr(completions, "choices", completions) - output_messages = [{"content": _get_attr(choice, "text", "")} for choice in choices] + output_messages = [Message(content=str(_get_attr(choice, "text", ""))) for choice in choices] span._set_ctx_items( { - INPUT_MESSAGES: [{"content": str(p)} for p in prompt], + INPUT_MESSAGES: [Message(content=p) for p in prompt], METADATA: parameters, OUTPUT_MESSAGES: output_messages, } @@ -324,14 +324,11 @@ def openai_set_meta_tags_from_chat( span: Span, kwargs: Dict[str, Any], messages: Optional[Any], integration_name: str = "openai" ) -> None: """Extract prompt/response tags from a chat completion and set them as temporary "_ml_obs.meta.*" tags.""" - input_messages = [] + input_messages: List[Message] = [] for m in kwargs.get("messages", []): content = str(_get_attr(m, "content", "")) role = str(_get_attr(m, "role", "")) - processed_message: Dict[str, Union[str, List[ToolCall], List[ToolResult]]] = { - "content": content, - "role": role, - } + processed_message: Message = Message(content=content, role=role) tool_call_id = _get_attr(m, "tool_call_id", None) if tool_call_id: core.dispatch(DISPATCH_ON_TOOL_CALL_OUTPUT_USED, (tool_call_id, span)) @@ -358,16 +355,16 @@ def openai_set_meta_tags_from_chat( span._set_ctx_item(TOOL_DEFINITIONS, tools) if span.error or not messages: - span._set_ctx_item(OUTPUT_MESSAGES, [{"content": ""}]) + span._set_ctx_item(OUTPUT_MESSAGES, [Message(content="")]) return if isinstance(messages, list): # streamed response role = "" - output_messages = [] + output_messages: List[Message] = [] for streamed_message in messages: # litellm roles appear only on the first choice, so store it to be used for all choices role = streamed_message.get("role", "") or role content = streamed_message.get("content", "") - message = {"content": content, "role": role} + message = Message(content=content, role=role) extracted_tool_calls, _ = _openai_extract_tool_calls_and_results_chat( streamed_message, llm_span=span, dispatch_llm_choice=True @@ -391,7 +388,7 @@ def openai_set_meta_tags_from_chat( ) capture_plain_text_tool_usage(extracted_tool_calls, extracted_tool_results, content, span) - message = {"content": content, "role": role} + message = Message(content=str(content), role=str(role)) if extracted_tool_calls: message["tool_calls"] = extracted_tool_calls if extracted_tool_results: @@ -433,10 +430,10 @@ def _openai_extract_tool_calls_and_results_chat( raw_args = safe_load_json(raw_args) if isinstance(raw_args, str) else raw_args tool_call_info = ToolCall( - name=tool_name, + name=str(tool_name), arguments=raw_args, - tool_id=tool_id, - type=tool_type, + tool_id=str(tool_id), + type=str(tool_type), ) tool_calls.append(tool_call_info) @@ -444,10 +441,10 @@ def _openai_extract_tool_calls_and_results_chat( if _get_attr(message, "role", "") == "tool": result = _get_attr(message, "content", "") tool_result_info = ToolResult( - name=_get_attr(message, "name", ""), + name=str(_get_attr(message, "name", "")), result=str(result) if result else "", - tool_id=_get_attr(message, "tool_call_id", ""), - type=_get_attr(message, "type", "tool_result"), + tool_id=str(_get_attr(message, "tool_call_id", "")), + type=str(_get_attr(message, "type", "tool_result")), ) tool_results.append(tool_result_info) @@ -541,7 +538,7 @@ def get_metadata_from_kwargs( def openai_get_input_messages_from_response_input( messages: Optional[Union[str, List[Dict[str, Any]]]], -) -> List[Dict[str, Any]]: +) -> List[Message]: """Parses the input to openai responses api into a list of input messages Args: @@ -556,7 +553,7 @@ def openai_get_input_messages_from_response_input( def _openai_parse_input_response_messages( messages: Optional[Union[str, List[Dict[str, Any]]]], system_instructions: Optional[str] = None -) -> Tuple[List[Dict[str, Any]], List[str]]: +) -> Tuple[List[Message], List[str]]: """ Parses input messages from the openai responses api into a list of processed messages and a list of tool call IDs. @@ -568,20 +565,20 @@ def _openai_parse_input_response_messages( - A list of processed messages - A list of tool call IDs """ - processed: List[Dict[str, Any]] = [] + processed: List[Message] = [] tool_call_ids: List[str] = [] if system_instructions: - processed.append({"role": "system", "content": system_instructions}) + processed.append(Message(role="system", content=system_instructions)) if not messages: return processed, tool_call_ids if isinstance(messages, str): - return [{"role": "user", "content": messages}], tool_call_ids + return [Message(content=messages, role="user")], tool_call_ids for item in messages: - processed_item: Dict[str, Union[str, List[ToolCall], List[ToolResult]]] = {} + processed_item: Message = Message() # Handle regular message if "content" in item and "role" in item: processed_item_content = "" @@ -619,7 +616,7 @@ def _openai_parse_input_response_messages( output = safe_json(output) tool_result_info = ToolResult( tool_id=item["call_id"], - result=output, + result=str(output) if output else "", name=item.get("name", ""), type=item.get("type", "function_call_output"), ) @@ -636,7 +633,7 @@ def _openai_parse_input_response_messages( return processed, tool_call_ids -def openai_get_output_messages_from_response(response: Optional[Any]) -> List[Dict[str, Any]]: +def openai_get_output_messages_from_response(response: Optional[Any]) -> List[Message]: """ Parses the output to openai responses api into a list of output messages @@ -658,7 +655,7 @@ def openai_get_output_messages_from_response(response: Optional[Any]) -> List[Di return processed_messages -def _openai_parse_output_response_messages(messages: List[Any]) -> Tuple[List[Dict[str, Any]], List[ToolCall]]: +def _openai_parse_output_response_messages(messages: List[Any]) -> Tuple[List[Message], List[ToolCall]]: """ Parses output messages from the openai responses api into a list of processed messages and a list of tool call outputs. @@ -670,11 +667,11 @@ def _openai_parse_output_response_messages(messages: List[Any]) -> Tuple[List[Di - A list of processed messages - A list of tool call outputs """ - processed: List[Dict[str, Any]] = [] + processed: List[Message] = [] tool_call_outputs: List[ToolCall] = [] for item in messages: - message = {} + message: Message = Message() message_type = _get_attr(item, "type", "") if message_type == "message": @@ -682,7 +679,7 @@ def _openai_parse_output_response_messages(messages: List[Any]) -> Tuple[List[Di for content in _get_attr(item, "content", []) or []: text += str(_get_attr(content, "text", "") or "") text += str(_get_attr(content, "refusal", "") or "") - message.update({"role": _get_attr(item, "role", "assistant"), "content": text}) + message.update({"role": str(_get_attr(item, "role", "assistant")), "content": text}) elif message_type == "reasoning": message.update( { @@ -693,19 +690,20 @@ def _openai_parse_output_response_messages(messages: List[Any]) -> Tuple[List[Di "encrypted_content": _get_attr(item, "encrypted_content", ""), "id": _get_attr(item, "id", ""), } - ), + ) + or "", } ) elif message_type == "function_call" or message_type == "custom_tool_call": call_id = _get_attr(item, "call_id", "") name = _get_attr(item, "name", "") raw_arguments = _get_attr(item, "input", "") or _get_attr(item, "arguments", OAI_HANDOFF_TOOL_ARG) - arguments = safe_load_json(raw_arguments) + arguments = safe_load_json(str(raw_arguments)) tool_call_info = ToolCall( - tool_id=call_id, + tool_id=str(call_id), arguments=arguments, - name=name, - type=_get_attr(item, "type", "function"), + name=str(name), + type=str(_get_attr(item, "type", "function")), ) tool_call_outputs.append(tool_call_info) message.update( @@ -753,7 +751,7 @@ def openai_set_meta_tags_from_response(span: Span, kwargs: Dict[str, Any], respo input_messages = openai_get_input_messages_from_response_input(input_data) if "instructions" in kwargs: - input_messages.insert(0, {"content": str(kwargs["instructions"]), "role": "system"}) + input_messages.insert(0, Message(content=str(kwargs["instructions"]), role="system")) span._set_ctx_items( { @@ -763,14 +761,14 @@ def openai_set_meta_tags_from_response(span: Span, kwargs: Dict[str, Any], respo ) if span.error or not response: - span._set_ctx_item(OUTPUT_MESSAGES, [{"content": ""}]) + span._set_ctx_item(OUTPUT_MESSAGES, [Message(content="")]) return # The response potentially contains enriched metadata (ex. tool calls) not in the original request metadata = span._get_ctx_item(METADATA) or {} metadata.update(openai_get_metadata_from_response(response)) span._set_ctx_item(METADATA, metadata) - output_messages = openai_get_output_messages_from_response(response) + output_messages: List[Message] = openai_get_output_messages_from_response(response) span._set_ctx_item(OUTPUT_MESSAGES, output_messages) tools = _openai_get_tool_definitions(kwargs.get("tools") or []) if tools: @@ -784,24 +782,24 @@ def _openai_get_tool_definitions(tools: List[Any]) -> List[ToolDefinition]: if _get_attr(tool, "function", None): function = _get_attr(tool, "function", {}) tool_definition = ToolDefinition( - name=_get_attr(function, "name", ""), - description=_get_attr(function, "description", ""), + name=str(_get_attr(function, "name", "")), + description=str(_get_attr(function, "description", "")), schema=_get_attr(function, "parameters", {}), ) # chat API custom tool access elif _get_attr(tool, "custom", None): custom_tool = _get_attr(tool, "custom", {}) tool_definition = ToolDefinition( - name=_get_attr(custom_tool, "name", ""), - description=_get_attr(custom_tool, "description", ""), + name=str(_get_attr(custom_tool, "name", "")), + description=str(_get_attr(custom_tool, "description", "")), schema=_get_attr(custom_tool, "format", {}), # format is a dict ) # chat API function access and response API tool access # only handles FunctionToolParam and CustomToolParam for response API for now else: tool_definition = ToolDefinition( - name=_get_attr(tool, "name", ""), - description=_get_attr(tool, "description", ""), + name=str(_get_attr(tool, "name", "")), + description=str(_get_attr(tool, "description", "")), schema=_get_attr(tool, "parameters", {}) or _get_attr(tool, "format", {}), ) if not any(tool_definition.values()): @@ -1108,7 +1106,7 @@ def get_error_data(self) -> Optional[Dict[str, Any]]: return None return self.error.get("data") - def llmobs_input_messages(self) -> Tuple[List[Dict[str, Any]], List[str]]: + def llmobs_input_messages(self) -> Tuple[List[Message], List[str]]: """Returns processed input messages for LLM Obs LLM spans. Returns: @@ -1117,7 +1115,7 @@ def llmobs_input_messages(self) -> Tuple[List[Dict[str, Any]], List[str]]: """ return _openai_parse_input_response_messages(self.input, self.response_system_instructions) - def llmobs_output_messages(self) -> Tuple[List[Dict[str, Any]], List[ToolCall]]: + def llmobs_output_messages(self) -> Tuple[List[Message], List[ToolCall]]: """Returns processed output messages for LLM Obs LLM spans. Returns: @@ -1213,7 +1211,7 @@ class LLMObsTraceInfo: def get_final_message_converse_stream_message( message: Dict[str, Any], text_blocks: Dict[int, str], tool_blocks: Dict[int, Dict[str, Any]] -) -> Dict[str, Any]: +) -> Message: """Process a message and its content blocks into LLM Obs message format. Args: @@ -1225,7 +1223,7 @@ def get_final_message_converse_stream_message( Dict containing the processed message with content and optional tool calls """ indices = sorted(message.get("content_block_indicies", [])) - message_output = {"role": message["role"]} + message_output = Message(role=message["role"]) text_contents = [text_blocks[idx] for idx in indices if idx in text_blocks] message_output.update({"content": "".join(text_contents)} if text_contents else {}) diff --git a/ddtrace/llmobs/_integrations/vertexai.py b/ddtrace/llmobs/_integrations/vertexai.py index d982d423f0a..f392e608bc7 100644 --- a/ddtrace/llmobs/_integrations/vertexai.py +++ b/ddtrace/llmobs/_integrations/vertexai.py @@ -22,6 +22,7 @@ from ddtrace.llmobs._integrations.google_utils import get_system_instructions_gemini_vertexai from ddtrace.llmobs._integrations.google_utils import llmobs_get_metadata_gemini_vertexai from ddtrace.llmobs._utils import _get_attr +from ddtrace.llmobs.types import Message from ddtrace.trace import Span @@ -57,7 +58,7 @@ def _llmobs_set_tags( input_contents = get_argument_value(args, kwargs, 0, "contents") input_messages = self._extract_input_message(input_contents, history, system_instruction) - output_messages = [{"content": ""}] + output_messages: List[Message] = [Message(content="")] if response is not None: output_messages = self._extract_output_message(response) metrics = self._extract_metrics_from_response(response) @@ -109,28 +110,28 @@ def _extract_metrics_from_response(self, response): return metrics - def _extract_input_message(self, contents, history, system_instruction=None): + def _extract_input_message(self, contents, history, system_instruction=None) -> List[Message]: from vertexai.generative_models._generative_models import Part - messages = [] + messages: List[Message] = [] if system_instruction: for instruction in system_instruction: - messages.append({"content": instruction or "", "role": "system"}) + messages.append(Message(content=instruction or "", role="system")) for content in history: messages.extend(self._extract_messages_from_content(content)) if isinstance(contents, str): - messages.append({"content": contents}) + messages.append(Message(content=contents)) return messages if isinstance(contents, Part): message = extract_message_from_part_gemini_vertexai(contents) messages.append(message) return messages if not isinstance(contents, list): - messages.append({"content": "[Non-text content object: {}]".format(repr(contents))}) + messages.append(Message(content="[Non-text content object: {}]".format(repr(contents)))) return messages for content in contents: if isinstance(content, str): - messages.append({"content": content}) + messages.append(Message(content=content)) continue if isinstance(content, Part): message = extract_message_from_part_gemini_vertexai(content) @@ -139,8 +140,8 @@ def _extract_input_message(self, contents, history, system_instruction=None): messages.extend(self._extract_messages_from_content(content)) return messages - def _extract_output_message(self, generations): - output_messages = [] + def _extract_output_message(self, generations) -> List[Message]: + output_messages: List[Message] = [] # streamed responses will be a list of chunks if isinstance(generations, list): message_content = "" @@ -153,7 +154,7 @@ def _extract_output_message(self, generations): for message in messages: message_content += message.get("content", "") tool_calls.extend(message.get("tool_calls", [])) - message = {"content": message_content, "role": role} + message = Message(content=message_content, role=role) if tool_calls: message["tool_calls"] = tool_calls return [message] @@ -164,14 +165,14 @@ def _extract_output_message(self, generations): return output_messages @staticmethod - def _extract_messages_from_content(content): - messages = [] + def _extract_messages_from_content(content) -> List[Message]: + messages: List[Message] = [] role = _get_attr(content, "role", "") parts = _get_attr(content, "parts", []) if not parts or not isinstance(parts, Iterable): - message = {"content": "[Non-text content object: {}]".format(repr(content))} + message = Message(content="[Non-text content object: {}]".format(repr(content))) if role: - message["role"] = role + message["role"] = str(role) messages.append(message) return messages for part in parts: diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 558e1276d21..62ec21aacca 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -13,7 +13,6 @@ from typing import Optional from typing import Set from typing import Tuple -from typing import TypedDict from typing import Union from typing import cast @@ -109,10 +108,15 @@ from ddtrace.llmobs._writer import LLMObsSpanEvent from ddtrace.llmobs._writer import LLMObsSpanWriter from ddtrace.llmobs._writer import should_use_agentless +from ddtrace.llmobs.types import ExportedLLMObsSpan +from ddtrace.llmobs.types import Message +from ddtrace.llmobs.types import Prompt +from ddtrace.llmobs.types import _ErrorField +from ddtrace.llmobs.types import _Meta +from ddtrace.llmobs.types import _MetaIO +from ddtrace.llmobs.types import _SpanField from ddtrace.llmobs.utils import Documents -from ddtrace.llmobs.utils import ExportedLLMObsSpan from ddtrace.llmobs.utils import Messages -from ddtrace.llmobs.utils import Prompt from ddtrace.llmobs.utils import extract_tool_definitions from ddtrace.propagation.http import HTTPPropagator @@ -163,10 +167,6 @@ def span_processor(span: LLMObsSpan) -> Optional[LLMObsSpan]: return span """ - class Message(TypedDict): - content: str - role: str - input: List[Message] = field(default_factory=list) output: List[Message] = field(default_factory=list) _tags: Dict[str, str] = field(default_factory=dict) @@ -270,9 +270,9 @@ def _llmobs_span_event(self, span: Span) -> Optional[LLMObsSpanEvent]: "apm_trace_id": format_trace_id(span.trace_id), } - meta: Dict[str, Any] = {"span.kind": span_kind, "input": {}, "output": {}} + meta = _Meta(span=_SpanField(kind=span_kind), input=_MetaIO(), output=_MetaIO()) if span_kind in ("llm", "embedding") and span._get_ctx_item(MODEL_NAME) is not None: - meta["model_name"] = span._get_ctx_item(MODEL_NAME) + meta["model_name"] = span._get_ctx_item(MODEL_NAME) or "" meta["model_provider"] = (span._get_ctx_item(MODEL_PROVIDER) or "custom").lower() metadata = span._get_ctx_item(METADATA) or {} if span_kind == "agent" and span._get_ctx_item(AGENT_MANIFEST) is not None: @@ -284,7 +284,7 @@ def _llmobs_span_event(self, span: Span) -> Optional[LLMObsSpanEvent]: if span._get_ctx_item(INPUT_VALUE) is not None: input_type = "value" llmobs_span.input = [ - {"content": safe_json(span._get_ctx_item(INPUT_VALUE), ensure_ascii=False), "role": ""} + Message(content=safe_json(span._get_ctx_item(INPUT_VALUE), ensure_ascii=False) or "", role="") ] if span.context.get_baggage_item(EXPERIMENT_ID_KEY): @@ -305,23 +305,23 @@ def _llmobs_span_event(self, span: Span) -> Optional[LLMObsSpanEvent]: input_messages = span._get_ctx_item(INPUT_MESSAGES) if span_kind == "llm" and input_messages is not None: input_type = "messages" - llmobs_span.input = cast(List[LLMObsSpan.Message], enforce_message_role(input_messages)) + llmobs_span.input = cast(List[Message], enforce_message_role(input_messages)) if span._get_ctx_item(OUTPUT_VALUE) is not None: output_type = "value" llmobs_span.output = [ - {"content": safe_json(span._get_ctx_item(OUTPUT_VALUE), ensure_ascii=False), "role": ""} + Message(content=safe_json(span._get_ctx_item(OUTPUT_VALUE), ensure_ascii=False) or "", role="") ] output_messages = span._get_ctx_item(OUTPUT_MESSAGES) if span_kind == "llm" and output_messages is not None: output_type = "messages" - llmobs_span.output = cast(List[LLMObsSpan.Message], enforce_message_role(output_messages)) + llmobs_span.output = cast(List[Message], enforce_message_role(output_messages)) if span_kind == "embedding" and span._get_ctx_item(INPUT_DOCUMENTS) is not None: - meta["input"]["documents"] = span._get_ctx_item(INPUT_DOCUMENTS) + meta["input"]["documents"] = span._get_ctx_item(INPUT_DOCUMENTS) or [] if span_kind == "retrieval" and span._get_ctx_item(OUTPUT_DOCUMENTS) is not None: - meta["output"]["documents"] = span._get_ctx_item(OUTPUT_DOCUMENTS) + meta["output"]["documents"] = span._get_ctx_item(OUTPUT_DOCUMENTS) or [] if span._get_ctx_item(INPUT_PROMPT) is not None: prompt_json_str = span._get_ctx_item(INPUT_PROMPT) @@ -330,7 +330,8 @@ def _llmobs_span_event(self, span: Span) -> Optional[LLMObsSpanEvent]: "Dropping prompt on non-LLM span kind, annotating prompts is only supported for LLM span kinds." ) else: - meta["input"]["prompt"] = prompt_json_str + prompt_dict = cast(Prompt, prompt_json_str) + meta["input"]["prompt"] = prompt_dict elif span_kind == "llm": parent_span = _get_nearest_llmobs_ancestor(span) if parent_span is not None: @@ -339,14 +340,12 @@ def _llmobs_span_event(self, span: Span) -> Optional[LLMObsSpanEvent]: meta["input"]["prompt"] = parent_prompt if span._get_ctx_item(TOOL_DEFINITIONS) is not None: - meta["tool_definitions"] = span._get_ctx_item(TOOL_DEFINITIONS) + meta["tool_definitions"] = span._get_ctx_item(TOOL_DEFINITIONS) or [] if span.error: - meta.update( - { - ERROR_MSG: span.get_tag(ERROR_MSG), - ERROR_STACK: span.get_tag(ERROR_STACK), - ERROR_TYPE: span.get_tag(ERROR_TYPE), - } + meta["error"] = _ErrorField( + message=span.get_tag(ERROR_MSG) or "", + stack=span.get_tag(ERROR_STACK) or "", + type=span.get_tag(ERROR_TYPE) or "", ) if self._user_span_processor: @@ -371,12 +370,12 @@ def _llmobs_span_event(self, span: Span) -> Optional[LLMObsSpanEvent]: if input_type == "messages": meta["input"]["messages"] = llmobs_span.input elif input_type == "value": - meta["input"]["value"] = llmobs_span.input[0]["content"] + meta["input"]["value"] = llmobs_span.input[0].get("content", "") if llmobs_span.output is not None: if output_type == "messages": meta["output"]["messages"] = llmobs_span.output elif output_type == "value": - meta["output"]["value"] = llmobs_span.output[0]["content"] + meta["output"]["value"] = llmobs_span.output[0].get("content", "") if not meta["input"]: meta.pop("input") diff --git a/ddtrace/llmobs/_utils.py b/ddtrace/llmobs/_utils.py index e2b235a3b17..444279c07cc 100644 --- a/ddtrace/llmobs/_utils.py +++ b/ddtrace/llmobs/_utils.py @@ -29,8 +29,9 @@ from ddtrace.llmobs._constants import SESSION_ID from ddtrace.llmobs._constants import SPAN_LINKS from ddtrace.llmobs._constants import VERTEXAI_APM_SPAN_NAME -from ddtrace.llmobs.utils import Message -from ddtrace.llmobs.utils import Prompt +from ddtrace.llmobs.types import Message +from ddtrace.llmobs.types import Prompt +from ddtrace.llmobs.types import _SpanLink from ddtrace.trace import Span @@ -298,13 +299,13 @@ def format_tool_call_arguments(tool_args: str) -> str: def add_span_link(span: Span, span_id: str, trace_id: str, from_io: str, to_io: str) -> None: - current_span_links = span._get_ctx_item(SPAN_LINKS) or [] + current_span_links: List[_SpanLink] = span._get_ctx_item(SPAN_LINKS) or [] current_span_links.append( - { - "span_id": span_id, - "trace_id": trace_id, - "attributes": {"from": from_io, "to": to_io}, - } + _SpanLink( + span_id=span_id, + trace_id=trace_id, + attributes={"from": from_io, "to": to_io}, + ) ) span._set_ctx_item(SPAN_LINKS, current_span_links) diff --git a/ddtrace/llmobs/_writer.py b/ddtrace/llmobs/_writer.py index 45781b5b059..3b62ea3bdda 100644 --- a/ddtrace/llmobs/_writer.py +++ b/ddtrace/llmobs/_writer.py @@ -45,6 +45,8 @@ from ddtrace.llmobs._experiment import Project from ddtrace.llmobs._experiment import UpdatableDatasetRecord from ddtrace.llmobs._utils import safe_json +from ddtrace.llmobs.types import _Meta +from ddtrace.llmobs.types import _SpanLink from ddtrace.settings._agent import config as agent_config @@ -56,7 +58,7 @@ class _LLMObsSpanEventOptional(TypedDict, total=False): service: str status_message: str collection_errors: List[str] - span_links: List[Dict[str, str]] + span_links: List[_SpanLink] class LLMObsSpanEvent(_LLMObsSpanEventOptional): @@ -68,7 +70,7 @@ class LLMObsSpanEvent(_LLMObsSpanEventOptional): start_ns: int duration: int status: str - meta: Dict[str, Any] + meta: _Meta metrics: Dict[str, Any] _dd: Dict[str, str] diff --git a/ddtrace/llmobs/types.py b/ddtrace/llmobs/types.py new file mode 100644 index 00000000000..09072a36e51 --- /dev/null +++ b/ddtrace/llmobs/types.py @@ -0,0 +1,110 @@ +from typing import Any +from typing import Dict +from typing import List +from typing import TypedDict +from typing import Union + + +class ExportedLLMObsSpan(TypedDict): + span_id: str + trace_id: str + + +class Document(TypedDict, total=False): + name: str + id: str + text: str + score: float + + +class ToolCall(TypedDict, total=False): + name: str + arguments: Dict[str, Any] + tool_id: str + type: str + + +class ToolResult(TypedDict, total=False): + name: str + result: str + tool_id: str + type: str + + +class ToolDefinition(TypedDict, total=False): + name: str + description: str + schema: Dict[str, Any] + + +class Message(TypedDict, total=False): + id: str + role: str + content: str + tool_calls: List[ToolCall] + tool_results: List[ToolResult] + tool_id: str + + +class _SpanField(TypedDict): + kind: str + + +class _ErrorField(TypedDict, total=False): + message: str + stack: str + type: str + + +class Prompt(TypedDict, total=False): + """ + A Prompt object that contains the information needed to render a prompt. + id: str - the id of the prompt set by the user. Should be unique per ml_app. + version: str - user tag for the version of the prompt. + variables: Dict[str, str] - a dictionary of variables that will be used to render the prompt + chat_template: Optional[Union[List[Dict[str, str]], List[Message]]] + - A list of dicts of (role,template) + where role is the role of the prompt and template is the template string + template: Optional[str] + - It also accepts a string that represents the template for the prompt. Will default to "user" for a role + tags: Optional[Dict[str, str]] + - List of tags to add to the prompt run. + rag_context_variables: List[str] - a list of variable key names that contain ground truth context information + rag_query_variables: List[str] - a list of variable key names that contains query information + """ + + version: str + id: str + template: str + chat_template: Union[List[Dict[str, str]], List[Message]] + variables: Dict[str, str] + tags: Dict[str, str] + rag_context_variables: List[str] + rag_query_variables: List[str] + + +class _MetaIO(TypedDict, total=False): + parameters: Dict[str, Any] + value: str + messages: List[Message] + prompt: Prompt + documents: List[Document] + + +class _Meta(TypedDict, total=False): + model_name: str + model_provider: str + span: _SpanField + error: _ErrorField + metadata: Dict[str, Any] + input: _MetaIO + output: _MetaIO + expected_output: _MetaIO + evaluations: Any + tool_definitions: List[ToolDefinition] + + +class _SpanLink(TypedDict): + span_id: str + trace_id: str + attributes: Dict[str, str] diff --git a/ddtrace/llmobs/utils.py b/ddtrace/llmobs/utils.py index ec81504a957..5d007dfa4a9 100644 --- a/ddtrace/llmobs/utils.py +++ b/ddtrace/llmobs/utils.py @@ -1,10 +1,14 @@ from typing import Any from typing import Dict from typing import List -from typing import TypedDict # noqa:F401 from typing import Union from ddtrace.internal.logger import get_logger +from ddtrace.llmobs.types import Document +from ddtrace.llmobs.types import Message +from ddtrace.llmobs.types import ToolCall +from ddtrace.llmobs.types import ToolDefinition +from ddtrace.llmobs.types import ToolResult log = get_logger(__name__) @@ -68,44 +72,6 @@ def _extract_tool_result(tool_result: Dict[str, Any]) -> "ToolResult": return formatted_tool_result -ExportedLLMObsSpan = TypedDict("ExportedLLMObsSpan", {"span_id": str, "trace_id": str}) -Document = TypedDict("Document", {"name": str, "id": str, "text": str, "score": float}, total=False) -Message = TypedDict( - "Message", - {"content": str, "role": str, "tool_calls": List["ToolCall"], "tool_results": List["ToolResult"]}, - total=False, -) -ToolCall = TypedDict( - "ToolCall", - { - "name": str, - "arguments": Dict[str, Any], - "tool_id": str, - "type": str, - }, - total=False, -) -ToolResult = TypedDict( - "ToolResult", - { - "name": str, - "result": str, - "tool_id": str, - "type": str, - }, - total=False, -) -ToolDefinition = TypedDict( - "ToolDefinition", - { - "name": str, - "description": str, - "schema": Dict[str, Any], - }, - total=False, -) - - def extract_tool_definitions(tool_definitions: List[Dict[str, Any]]) -> List[ToolDefinition]: """Return a list of validated tool definitions.""" if not isinstance(tool_definitions, list): @@ -149,33 +115,6 @@ def extract_tool_definitions(tool_definitions: List[Dict[str, Any]]) -> List[Too return validated_tool_definitions -class Prompt(TypedDict, total=False): - """ - A Prompt object that contains the information needed to render a prompt. - id: str - the id of the prompt set by the user. Should be unique per ml_app. - version: str - user tag for the version of the prompt. - variables: Dict[str, str] - a dictionary of variables that will be used to render the prompt - chat_template: Optional[Union[List[Dict[str, str]], List[Message]]] - - A list of dicts of (role,template) - where role is the role of the prompt and template is the template string - template: Optional[str] - - It also accepts a string that represents the template for the prompt. Will default to "user" for a role - tags: Optional[Dict[str, str]] - - List of tags to add to the prompt run. - rag_context_variables: List[str] - a list of variable key names that contain ground truth context information - rag_query_variables: List[str] - a list of variable key names that contains query information - """ - - version: str - id: str - template: str - chat_template: Union[List[Dict[str, str]], List[Message]] - variables: Dict[str, str] - tags: Dict[str, str] - rag_context_variables: List[str] - rag_query_variables: List[str] - - class Messages: def __init__(self, messages: Union[List[Dict[str, Any]], Dict[str, Any], str]): self.messages = [] diff --git a/tests/contrib/anthropic/test_anthropic_llmobs.py b/tests/contrib/anthropic/test_anthropic_llmobs.py index 4bd9d5cf652..b1cc0a054b9 100644 --- a/tests/contrib/anthropic/test_anthropic_llmobs.py +++ b/tests/contrib/anthropic/test_anthropic_llmobs.py @@ -121,7 +121,7 @@ def test_completion_proxy( ) span = mock_tracer.pop_traces()[0][0] assert mock_llmobs_writer.enqueue.call_count == 2 - assert mock_llmobs_writer.enqueue.call_args_list[1].args[0]["meta"]["span.kind"] == "llm" + assert mock_llmobs_writer.enqueue.call_args_list[1].args[0]["meta"]["span"]["kind"] == "llm" def test_completion(self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr): """Ensure llmobs records are emitted for completion endpoints when configured. @@ -665,7 +665,12 @@ def test_tools_sync_stream( + " the location is fully specified. We can proceed with calling the get_weather tool.\n", "type": "text", }, - {"text": WEATHER_OUTPUT_MESSAGE_2_TOOL_CALL, "type": "text"}, + { + "name": "get_weather", + "input": {"location": "San Francisco, CA"}, + "id": "toolu_01DYJo37oETVsCdLTTcCWcdq", + "type": "tool_use", + }, ] traces = mock_tracer.pop_traces() @@ -733,7 +738,7 @@ def test_tools_sync_stream( input_messages=[ {"content": WEATHER_PROMPT, "role": "user"}, {"content": message[0]["text"], "role": "assistant"}, - {"content": message[1]["text"], "role": "assistant"}, + {"content": "", "role": "assistant", "tool_calls": WEATHER_OUTPUT_MESSAGE_2_TOOL_CALL}, {"content": "", "role": "user", "tool_results": WEATHER_TOOL_RESULT}, ], output_messages=[ diff --git a/tests/contrib/botocore/test_bedrock_agents_llmobs.py b/tests/contrib/botocore/test_bedrock_agents_llmobs.py index de2d4bd5439..3eb41802e9d 100644 --- a/tests/contrib/botocore/test_bedrock_agents_llmobs.py +++ b/tests/contrib/botocore/test_bedrock_agents_llmobs.py @@ -52,7 +52,7 @@ def _assert_agent_span(agent_span, resp_str): assert agent_span["meta"]["output"]["value"] == resp_str assert agent_span["meta"]["metadata"]["agent_alias_id"] == AGENT_ALIAS_ID assert agent_span["meta"]["metadata"]["agent_id"] == AGENT_ID - assert agent_span["meta"]["span.kind"] == "agent" + assert agent_span["meta"]["span"]["kind"] == "agent" assert "session_id:{}".format(SESSION_ID) in agent_span["tags"] @@ -64,26 +64,23 @@ def _assert_trace_step_spans(trace_step_spans): assert trace_step_spans[3]["name"].startswith("orchestrationTrace Step") assert trace_step_spans[4]["name"].startswith("orchestrationTrace Step") assert trace_step_spans[5]["name"].startswith("guardrailTrace Step") - assert all(span["meta"]["span.kind"] == "workflow" for span in trace_step_spans) + assert all(span["meta"]["span"]["kind"] == "workflow" for span in trace_step_spans) assert all(span["meta"]["metadata"]["bedrock_trace_id"] == span["span_id"] for span in trace_step_spans) def _assert_inner_span(span): assert span["name"] in ["guardrail", "modelInvocation", "reasoning", "location_suggestion"] - if span["name"] == "guardrail": - assert span["meta"]["span.kind"] == "task" + if span["name"] == "guardrail" or span["name"] == "reasoning": + assert span["meta"]["span"]["kind"] == "task" assert span["meta"]["output"].get("value") is not None elif span["name"] == "modelInvocation": - assert span["meta"]["span.kind"] == "llm" + assert span["meta"]["span"]["kind"] == "llm" assert span["meta"]["metadata"]["model_name"] == MODEL_NAME assert span["meta"]["metadata"]["model_provider"] == MODEL_PROVIDER assert span["metrics"].get("input_tokens") is not None assert span["metrics"].get("output_tokens") is not None - elif span["name"] == "reasoning": - assert span["meta"]["span.kind"] == "task" - assert span["meta"]["output"].get("value") is not None elif span["name"] == "location_suggestion": - assert span["meta"]["span.kind"] == "tool" + assert span["meta"]["span"]["kind"] == "tool" assert span["meta"]["output"].get("value") is not None diff --git a/tests/contrib/botocore/test_bedrock_llmobs.py b/tests/contrib/botocore/test_bedrock_llmobs.py index bef0b71d0cf..d88770452cb 100644 --- a/tests/contrib/botocore/test_bedrock_llmobs.py +++ b/tests/contrib/botocore/test_bedrock_llmobs.py @@ -721,7 +721,7 @@ def _test_llmobs_invoke_proxy( else: span = mock_tracer.pop_traces()[0][0] assert len(llmobs_events) == 1 - assert llmobs_events[0]["meta"]["span.kind"] == "llm" + assert llmobs_events[0]["meta"]["span"]["kind"] == "llm" LLMObs.disable() @@ -769,7 +769,7 @@ def _test_llmobs_invoke_stream_proxy( else: span = mock_tracer.pop_traces()[0][0] assert len(llmobs_events) == 1 - assert llmobs_events[0]["meta"]["span.kind"] == "llm" + assert llmobs_events[0]["meta"]["span"]["kind"] == "llm" LLMObs.disable() diff --git a/tests/contrib/langchain/test_langchain_llmobs.py b/tests/contrib/langchain/test_langchain_llmobs.py index 7d70fc92101..c4681f60e7a 100644 --- a/tests/contrib/langchain/test_langchain_llmobs.py +++ b/tests/contrib/langchain/test_langchain_llmobs.py @@ -100,7 +100,7 @@ def test_llmobs_openai_llm_proxy(mock_generate, langchain_openai, llmobs_events, llm.invoke("What is the capital of France?") span = tracer.pop_traces()[0][0] assert len(llmobs_events) == 2 - assert llmobs_events[1]["meta"]["span.kind"] == "llm" + assert llmobs_events[1]["meta"]["span"]["kind"] == "llm" def test_llmobs_openai_chat_model(langchain_openai, llmobs_events, tracer, openai_url): @@ -151,7 +151,7 @@ def test_llmobs_openai_chat_model_proxy(mock_generate, langchain_openai, llmobs_ chat_model.invoke([HumanMessage(content="What is the capital of France?")]) span = tracer.pop_traces()[0][0] assert len(llmobs_events) == 2 - assert llmobs_events[1]["meta"]["span.kind"] == "llm" + assert llmobs_events[1]["meta"]["span"]["kind"] == "llm" def test_llmobs_string_prompt_template_invoke(langchain_core, langchain_openai, openai_url, llmobs_events, tracer): @@ -654,8 +654,8 @@ def _assert_trace_structure_from_writer_call_args(self, span_kinds): call_args = call.args[0] assert ( - call_args["meta"]["span.kind"] == span_kind - ), f"Span kind is {call_args['meta']['span.kind']} but expected {span_kind}" + call_args["meta"]["span"]["kind"] == span_kind + ), f"Span kind is {call_args['meta']['span']['kind']} but expected {span_kind}" if span_kind == "workflow": assert len(call_args["meta"]["input"]["value"]) > 0 assert len(call_args["meta"]["output"]["value"]) > 0 diff --git a/tests/contrib/langgraph/test_langgraph_llmobs.py b/tests/contrib/langgraph/test_langgraph_llmobs.py index 58f66cbc1fa..3af7c6bde50 100644 --- a/tests/contrib/langgraph/test_langgraph_llmobs.py +++ b/tests/contrib/langgraph/test_langgraph_llmobs.py @@ -435,9 +435,9 @@ def test_agent_with_tool_calls_integrations_enabled( tool_span = llmobs_events[4] second_llm_span = llmobs_events[6] - assert first_llm_span["meta"]["span.kind"] == "llm" - assert tool_span["meta"]["span.kind"] == "tool" - assert second_llm_span["meta"]["span.kind"] == "llm" + assert first_llm_span["meta"]["span"]["kind"] == "llm" + assert tool_span["meta"]["span"]["kind"] == "tool" + assert second_llm_span["meta"]["span"]["kind"] == "llm" # assert llm -> tool span link assert tool_span["span_links"][1]["span_id"] == first_llm_span["span_id"] diff --git a/tests/contrib/litellm/test_litellm_llmobs.py b/tests/contrib/litellm/test_litellm_llmobs.py index 0efb663daea..5c039c56d55 100644 --- a/tests/contrib/litellm/test_litellm_llmobs.py +++ b/tests/contrib/litellm/test_litellm_llmobs.py @@ -320,7 +320,7 @@ def test_router_completion( router_event = llmobs_events[1] llm_event = llmobs_events[0] - assert llm_event["meta"]["span.kind"] == "llm" + assert llm_event["meta"]["span"]["kind"] == "llm" assert llm_event["name"] == "completion" assert router_event == _expected_llmobs_non_llm_span_event( router_span, @@ -362,7 +362,7 @@ async def test_router_acompletion( router_event = llmobs_events[1] llm_event = llmobs_events[0] - assert llm_event["meta"]["span.kind"] == "llm" + assert llm_event["meta"]["span"]["kind"] == "llm" assert llm_event["name"] == "acompletion" assert router_event == _expected_llmobs_non_llm_span_event( router_span, @@ -404,7 +404,7 @@ def test_router_text_completion( router_event = llmobs_events[1] llm_event = llmobs_events[0] - assert llm_event["meta"]["span.kind"] == "llm" + assert llm_event["meta"]["span"]["kind"] == "llm" assert llm_event["name"] == "text_completion" assert router_event == _expected_llmobs_non_llm_span_event( router_span, @@ -446,7 +446,7 @@ async def test_router_atext_completion( router_event = llmobs_events[1] llm_event = llmobs_events[0] - assert llm_event["meta"]["span.kind"] == "llm" + assert llm_event["meta"]["span"]["kind"] == "llm" assert llm_event["name"] == "atext_completion" assert router_event == _expected_llmobs_non_llm_span_event( router_span, diff --git a/tests/contrib/openai/test_openai_llmobs.py b/tests/contrib/openai/test_openai_llmobs.py index b19bf7970ad..4db68c588a4 100644 --- a/tests/contrib/openai/test_openai_llmobs.py +++ b/tests/contrib/openai/test_openai_llmobs.py @@ -110,7 +110,7 @@ def test_completion_proxy( ) span = mock_tracer.pop_traces()[0][0] assert mock_llmobs_writer.enqueue.call_count == 2 - assert mock_llmobs_writer.enqueue.call_args_list[1].args[0]["meta"]["span.kind"] == "llm" + assert mock_llmobs_writer.enqueue.call_args_list[1].args[0]["meta"]["span"]["kind"] == "llm" def test_completion(self, openai, ddtrace_global_config, mock_llmobs_writer, mock_tracer): """Ensure llmobs records are emitted for completion endpoints when configured. @@ -197,7 +197,7 @@ def test_completion_azure_proxy( ) span = mock_tracer.pop_traces()[0][0] assert mock_llmobs_writer.enqueue.call_count == 2 - assert mock_llmobs_writer.enqueue.call_args_list[1].args[0]["meta"]["span.kind"] == "llm" + assert mock_llmobs_writer.enqueue.call_args_list[1].args[0]["meta"]["span"]["kind"] == "llm" @pytest.mark.skipif( parse_version(openai_module.version.VERSION) >= (1, 60), @@ -337,7 +337,7 @@ def test_chat_completion_proxy( client.chat.completions.create(model=model, messages=input_messages, top_p=0.9, n=2, user="ddtrace-test") span = mock_tracer.pop_traces()[0][0] assert mock_llmobs_writer.enqueue.call_count == 2 - assert mock_llmobs_writer.enqueue.call_args_list[1].args[0]["meta"]["span.kind"] == "llm" + assert mock_llmobs_writer.enqueue.call_args_list[1].args[0]["meta"]["span"]["kind"] == "llm" def test_chat_completion(self, openai, ddtrace_global_config, mock_llmobs_writer, mock_tracer): """Ensure llmobs records are emitted for chat completion endpoints when configured. @@ -426,7 +426,7 @@ def test_chat_completion_azure_proxy( ) span = mock_tracer.pop_traces()[0][0] assert mock_llmobs_writer.enqueue.call_count == 2 - assert mock_llmobs_writer.enqueue.call_args_list[1].args[0]["meta"]["span.kind"] == "llm" + assert mock_llmobs_writer.enqueue.call_args_list[1].args[0]["meta"]["span"]["kind"] == "llm" @pytest.mark.skipif( parse_version(openai_module.version.VERSION) >= (1, 60), diff --git a/tests/contrib/pydantic_ai/test_pydantic_ai_llmobs.py b/tests/contrib/pydantic_ai/test_pydantic_ai_llmobs.py index 1742d028605..0545ff6dbed 100644 --- a/tests/contrib/pydantic_ai/test_pydantic_ai_llmobs.py +++ b/tests/contrib/pydantic_ai/test_pydantic_ai_llmobs.py @@ -218,7 +218,7 @@ async def test_agent_iter_error(self, pydantic_ai, request_vcr, llmobs_events): assert len(llmobs_events) == 1 assert llmobs_events[0]["status"] == "error" - assert llmobs_events[0]["meta"]["error.message"] == "test error" + assert llmobs_events[0]["meta"]["error"]["message"] == "test error" @pytest.mark.skipif(PYDANTIC_AI_VERSION < (0, 4, 4), reason="pydantic-ai < 0.4.4 does not support toolsets") async def test_agent_run_with_toolset(self, pydantic_ai, request_vcr, llmobs_events, mock_tracer): diff --git a/tests/llmobs/_utils.py b/tests/llmobs/_utils.py index 00494a3eb0c..a12154f2c34 100644 --- a/tests/llmobs/_utils.py +++ b/tests/llmobs/_utils.py @@ -2,6 +2,10 @@ import mock +from ddtrace.llmobs.types import _ErrorField +from ddtrace.llmobs.types import _Meta +from ddtrace.llmobs.types import _SpanField + try: import vcr @@ -226,7 +230,7 @@ def _llmobs_base_span_event( "start_ns": span.start_ns, "duration": span.duration_ns, "status": "error" if error else "ok", - "meta": {"span.kind": span_kind}, + "meta": _Meta(span=_SpanField(kind=span_kind)), "metrics": {}, "tags": _expected_llmobs_tags(span, tags=tags, error=error, session_id=session_id), "_dd": { @@ -238,9 +242,7 @@ def _llmobs_base_span_event( if session_id: span_event["session_id"] = session_id if error: - span_event["meta"]["error.type"] = error - span_event["meta"]["error.message"] = error_message - span_event["meta"]["error.stack"] = error_stack + span_event["meta"]["error"] = _ErrorField(type=error, message=error_message or "", stack=error_stack or "") if span_links: span_event["span_links"] = mock.ANY return span_event @@ -322,7 +324,9 @@ def _completion_event(): "duration": 12345678900, "status": "ok", "meta": { - "span.kind": "llm", + "span": { + "kind": "llm", + }, "model_name": "ada", "model_provider": "openai", "input": { @@ -353,7 +357,9 @@ def _chat_completion_event(): "duration": 12345678900, "status": "ok", "meta": { - "span.kind": "llm", + "span": { + "kind": "llm", + }, "model_name": "gpt-3.5-turbo", "model_provider": "openai", "input": { @@ -391,7 +397,9 @@ def _chat_completion_event_with_unserializable_field(): "duration": 12345678900, "status": "ok", "meta": { - "span.kind": "llm", + "span": { + "kind": "llm", + }, "model_name": "gpt-3.5-turbo", "model_provider": "openai", "metadata": {"unserializable": object()}, @@ -430,7 +438,9 @@ def _large_event(): "duration": 12345678900, "status": "ok", "meta": { - "span.kind": "llm", + "span": { + "kind": "llm", + }, "model_name": "gpt-3.5-turbo", "model_provider": "openai", "input": { @@ -468,7 +478,9 @@ def _oversized_llm_event(): "duration": 12345678900, "status": "ok", "meta": { - "span.kind": "llm", + "span": { + "kind": "llm", + }, "model_name": "gpt-3.5-turbo", "model_provider": "openai", "input": { @@ -506,7 +518,9 @@ def _oversized_workflow_event(): "duration": 12345678900, "status": "ok", "meta": { - "span.kind": "workflow", + "span": { + "kind": "workflow", + }, "input": {"value": "A" * 2_600_000}, "output": {"value": "A" * 2_600_000}, }, @@ -526,7 +540,9 @@ def _oversized_retrieval_event(): "duration": 12345678900, "status": "ok", "meta": { - "span.kind": "retrieval", + "span": { + "kind": "retrieval", + }, "input": {"documents": {"content": "A" * 2_600_000}}, "output": {"value": "A" * 2_600_000}, }, @@ -621,7 +637,9 @@ def _expected_ragas_context_precision_spans(ragas_inputs=None): "duration": mock.ANY, "status": "ok", "meta": { - "span.kind": "workflow", + "span": { + "kind": "workflow", + }, "input": {"value": mock.ANY}, "output": {"value": "1.0"}, "metadata": {}, @@ -640,7 +658,9 @@ def _expected_ragas_context_precision_spans(ragas_inputs=None): "duration": mock.ANY, "status": "ok", "meta": { - "span.kind": "workflow", + "span": { + "kind": "workflow", + }, "input": {"value": mock.ANY}, "output": {"value": mock.ANY}, "metadata": {}, @@ -665,7 +685,9 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): "duration": mock.ANY, "status": "ok", "meta": { - "span.kind": "workflow", + "span": { + "kind": "workflow", + }, "input": {"value": mock.ANY}, "output": {"value": "1.0"}, "metadata": { @@ -686,7 +708,9 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): "duration": mock.ANY, "status": "ok", "meta": { - "span.kind": "workflow", + "span": { + "kind": "workflow", + }, "input": {"value": mock.ANY}, "output": {"value": mock.ANY}, "metadata": {}, @@ -704,7 +728,9 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): "duration": mock.ANY, "status": "ok", "meta": { - "span.kind": "workflow", + "span": { + "kind": "workflow", + }, "input": {"value": mock.ANY}, "output": {"value": mock.ANY}, "metadata": {}, @@ -722,7 +748,7 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): "start_ns": mock.ANY, "duration": mock.ANY, "status": "ok", - "meta": {"span.kind": "task", "metadata": {}}, + "meta": {"span": {"kind": "task"}, "metadata": {}}, "metrics": {}, "tags": expected_ragas_trace_tags(), "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY, "apm_trace_id": mock.ANY}, @@ -736,7 +762,9 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): "duration": mock.ANY, "status": "ok", "meta": { - "span.kind": "workflow", + "span": { + "kind": "workflow", + }, "input": {"value": mock.ANY}, "output": {"value": mock.ANY}, "metadata": {}, @@ -754,7 +782,7 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): "start_ns": mock.ANY, "duration": mock.ANY, "status": "ok", - "meta": {"span.kind": "task", "metadata": {}}, + "meta": {"span": {"kind": "task"}, "metadata": {}}, "metrics": {}, "tags": expected_ragas_trace_tags(), "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY, "apm_trace_id": mock.ANY}, @@ -768,7 +796,7 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): "duration": mock.ANY, "status": "ok", "meta": { - "span.kind": "task", + "span": {"kind": "task"}, "output": {"value": "1.0"}, "metadata": {"faithful_statements": 1, "num_statements": 1}, }, @@ -792,7 +820,7 @@ def _expected_ragas_answer_relevancy_spans(ragas_inputs=None): "duration": mock.ANY, "status": "ok", "meta": { - "span.kind": "workflow", + "span": {"kind": "workflow"}, "input": {"value": mock.ANY}, "output": {"value": mock.ANY}, "metadata": {"answer_classifications": mock.ANY, "strictness": mock.ANY}, @@ -811,7 +839,7 @@ def _expected_ragas_answer_relevancy_spans(ragas_inputs=None): "duration": mock.ANY, "status": "ok", "meta": { - "span.kind": "workflow", + "span": {"kind": "workflow"}, "input": {"value": mock.ANY}, "output": {"value": mock.ANY}, "metadata": {}, @@ -829,7 +857,7 @@ def _expected_ragas_answer_relevancy_spans(ragas_inputs=None): "duration": mock.ANY, "status": "ok", "meta": { - "span.kind": "workflow", + "span": {"kind": "workflow"}, "input": {"value": mock.ANY}, "output": {"value": mock.ANY}, "metadata": {}, diff --git a/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_chat_completion_event.yaml b/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_chat_completion_event.yaml index 9fc04045726..8618855c52b 100644 --- a/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_chat_completion_event.yaml +++ b/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_chat_completion_event.yaml @@ -5,7 +5,7 @@ interactions: "parent_id": "", "session_id": "98765432102", "name": "chat_completion_span", "tags": ["version:", "env:", "service:tests.llmobs", "source:integration"], "start_ns": 1707763310981223936, "duration": 12345678900, "status": "ok", "meta": - {"span.kind": "llm", "model_name": "gpt-3.5-turbo", "model_provider": "openai", + {"span": {"kind": "llm"}, "model_name": "gpt-3.5-turbo", "model_provider": "openai", "input": {"messages": [{"role": "system", "content": "You are an evil dark lord looking for his one ring to rule them all"}, {"role": "user", "content": "I am a hobbit looking to go to Mordor"}]}, "output": {"messages": [{"content": diff --git a/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_completion_event.yaml b/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_completion_event.yaml index bf433ac810b..6232fbcd2e1 100644 --- a/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_completion_event.yaml +++ b/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_completion_event.yaml @@ -5,7 +5,7 @@ interactions: "98765432101", "parent_id": "", "session_id": "98765432101", "name": "completion_span", "tags": ["version:", "env:", "service:tests.llmobs", "source:integration"], "start_ns": 1707763310981223236, "duration": 12345678900, "status": "ok", "meta": - {"span.kind": "llm", "model_name": "ada", "model_provider": "openai", "input": + {"span": {"kind": "llm"}, "model_name": "ada", "model_provider": "openai", "input": {"messages": [{"content": "who broke enigma?"}]}, "output": {"messages": [{"content": "\n\nThe Enigma code was broken by a team of codebreakers at Bletchley Park, led by mathematician Alan Turing."}]}, "metadata": {"temperature": 0, "max_tokens": diff --git a/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_multiple_events.yaml b/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_multiple_events.yaml index a2d8d2c67e8..0af247005d6 100644 --- a/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_multiple_events.yaml +++ b/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_multiple_events.yaml @@ -5,7 +5,7 @@ interactions: "98765432101", "parent_id": "", "session_id": "98765432101", "name": "completion_span", "tags": ["version:", "env:", "service:tests.llmobs", "source:integration"], "start_ns": 1707763310981223236, "duration": 12345678900, "status": "ok", "meta": - {"span.kind": "llm", "model_name": "ada", "model_provider": "openai", "input": + {"span": {"kind": "llm"}, "model_name": "ada", "model_provider": "openai", "input": {"messages": [{"content": "who broke enigma?"}]}, "output": {"messages": [{"content": "\n\nThe Enigma code was broken by a team of codebreakers at Bletchley Park, led by mathematician Alan Turing."}]}, "metadata": {"temperature": 0, "max_tokens": @@ -15,7 +15,7 @@ interactions: "parent_id": "", "session_id": "98765432102", "name": "chat_completion_span", "tags": ["version:", "env:", "service:tests.llmobs", "source:integration"], "start_ns": 1707763310981223936, "duration": 12345678900, "status": "ok", "meta": - {"span.kind": "llm", "model_name": "gpt-3.5-turbo", "model_provider": "openai", + {"span": {"kind": "llm"}, "model_name": "gpt-3.5-turbo", "model_provider": "openai", "input": {"messages": [{"role": "system", "content": "You are an evil dark lord looking for his one ring to rule them all"}, {"role": "user", "content": "I am a hobbit looking to go to Mordor"}]}, "output": {"messages": [{"content": diff --git a/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_timed_events.yaml b/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_timed_events.yaml index ee9da5290fc..c91f44a0f5f 100644 --- a/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_timed_events.yaml +++ b/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_timed_events.yaml @@ -5,7 +5,7 @@ interactions: "98765432101", "parent_id": "", "session_id": "98765432101", "name": "completion_span", "tags": ["version:", "env:", "service:tests.llmobs", "source:integration"], "start_ns": 1707763310981223236, "duration": 12345678900, "status": "ok", "meta": - {"span.kind": "llm", "model_name": "ada", "model_provider": "openai", "input": + {"span": {"kind": "llm"}, "model_name": "ada", "model_provider": "openai", "input": {"messages": [{"content": "who broke enigma?"}]}, "output": {"messages": [{"content": "\n\nThe Enigma code was broken by a team of codebreakers at Bletchley Park, led by mathematician Alan Turing."}]}, "metadata": {"temperature": 0, "max_tokens": diff --git a/tests/llmobs/test_llmobs.py b/tests/llmobs/test_llmobs.py index 9b991805acd..b616e7695c5 100644 --- a/tests/llmobs/test_llmobs.py +++ b/tests/llmobs/test_llmobs.py @@ -12,7 +12,7 @@ from ddtrace.llmobs._constants import PARENT_ID_KEY from ddtrace.llmobs._constants import ROOT_PARENT_ID from ddtrace.llmobs._utils import _get_session_id -from ddtrace.llmobs.utils import Prompt +from ddtrace.llmobs.types import Prompt from tests.llmobs._utils import _expected_llmobs_llm_span_event @@ -365,9 +365,9 @@ def test_error_is_set(tracer, llmobs_events): llm_span._set_ctx_item(const.SPAN_KIND, "llm") raise ValueError("error") span_event = llmobs_events[0] - assert span_event["meta"]["error.message"] == "error" - assert "ValueError" in span_event["meta"]["error.type"] - assert 'raise ValueError("error")' in span_event["meta"]["error.stack"] + assert span_event["meta"]["error"]["message"] == "error" + assert "ValueError" in span_event["meta"]["error"]["type"] + assert 'raise ValueError("error")' in span_event["meta"]["error"]["stack"] def test_model_provider_defaults_to_custom(tracer, llmobs_events): diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index 2519083b7ad..3f080dd03ae 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -31,7 +31,7 @@ from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING from ddtrace.llmobs._constants import TAGS from ddtrace.llmobs._llmobs import SUPPORTED_LLMOBS_INTEGRATIONS -from ddtrace.llmobs.utils import Prompt +from ddtrace.llmobs.types import Prompt from ddtrace.trace import Context from tests.llmobs._utils import _expected_llmobs_eval_metric_event from tests.llmobs._utils import _expected_llmobs_llm_span_event diff --git a/tests/llmobs/test_llmobs_span_agent_writer.py b/tests/llmobs/test_llmobs_span_agent_writer.py index 9b0470466ab..564db83e0b3 100644 --- a/tests/llmobs/test_llmobs_span_agent_writer.py +++ b/tests/llmobs/test_llmobs_span_agent_writer.py @@ -68,9 +68,9 @@ def test_truncating_oversized_events(mock_send_payload, mock_writer_logs): llmobs_span_writer.enqueue(_oversized_workflow_event()) mock_writer_logs.warning.assert_has_calls( [ - mock.call("dropping event input/output because its size (%d) exceeds the event size limit (5MB)", 5200724), - mock.call("dropping event input/output because its size (%d) exceeds the event size limit (5MB)", 5200464), - mock.call("dropping event input/output because its size (%d) exceeds the event size limit (5MB)", 5200445), + mock.call("dropping event input/output because its size (%d) exceeds the event size limit (5MB)", 5200729), + mock.call("dropping event input/output because its size (%d) exceeds the event size limit (5MB)", 5200469), + mock.call("dropping event input/output because its size (%d) exceeds the event size limit (5MB)", 5200450), ] ) diff --git a/tests/llmobs/test_llmobs_span_agentless_writer.py b/tests/llmobs/test_llmobs_span_agentless_writer.py index 0657b7aede9..f2b34519187 100644 --- a/tests/llmobs/test_llmobs_span_agentless_writer.py +++ b/tests/llmobs/test_llmobs_span_agentless_writer.py @@ -61,9 +61,9 @@ def test_truncating_oversized_events(mock_writer_logs): llmobs_span_writer.enqueue(_oversized_workflow_event()) mock_writer_logs.warning.assert_has_calls( [ - mock.call("dropping event input/output because its size (%d) exceeds the event size limit (5MB)", 5200724), - mock.call("dropping event input/output because its size (%d) exceeds the event size limit (5MB)", 5200464), - mock.call("dropping event input/output because its size (%d) exceeds the event size limit (5MB)", 5200445), + mock.call("dropping event input/output because its size (%d) exceeds the event size limit (5MB)", 5200729), + mock.call("dropping event input/output because its size (%d) exceeds the event size limit (5MB)", 5200469), + mock.call("dropping event input/output because its size (%d) exceeds the event size limit (5MB)", 5200450), ] )