Skip to content
4 changes: 0 additions & 4 deletions ddtrace/contrib/internal/anthropic/utils.py

This file was deleted.

46 changes: 24 additions & 22 deletions ddtrace/llmobs/_integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
from ddtrace.llmobs._integrations.utils import update_proxy_workflow_input_output_value
from ddtrace.llmobs._utils import _get_attr
from ddtrace.llmobs.utils import ToolCall
from ddtrace.llmobs.utils import Message, ToolCall
from ddtrace.llmobs.utils import ToolDefinition
from ddtrace.llmobs.utils import ToolResult
from ddtrace.trace import Span
Expand Down Expand Up @@ -71,7 +71,7 @@ def _llmobs_set_tags(
system_prompt = kwargs.get("system")
input_messages = self._extract_input_message(messages, system_prompt)

output_messages = [{"content": ""}]
output_messages: List[Message] = [Message(content="")]
if not span.error and response is not None:
output_messages = self._extract_output_message(response)
span_kind = "workflow" if span._get_ctx_item(PROXY_REQUEST) else "llm"
Expand All @@ -92,14 +92,16 @@ def _llmobs_set_tags(
)
update_proxy_workflow_input_output_value(span, span_kind)

def _extract_input_message(self, messages, system_prompt: Optional[Union[str, List[Dict[str, Any]]]] = None):
def _extract_input_message(
self, messages, system_prompt: Optional[Union[str, List[Dict[str, Any]]]] = None
) -> List[Message]:
"""Extract input messages from the stored prompt.
Anthropic allows for messages and multiple texts in a message, which requires some special casing.
"""
if not isinstance(messages, Iterable):
log.warning("Anthropic input must be a list of messages.")

input_messages = []
input_messages: List[Message] = []
if system_prompt is not None:
messages = [{"content": system_prompt, "role": "system"}] + messages

Expand All @@ -115,43 +117,43 @@ def _extract_input_message(self, messages, system_prompt: Optional[Union[str, Li
log.warning("Anthropic input message must have content and role.")

if isinstance(content, str):
input_messages.append({"content": content, "role": role})
input_messages.append(Message(content=content, role=str(role)))

elif isinstance(content, list):
for block in content:
if _get_attr(block, "type", None) == "text":
input_messages.append({"content": _get_attr(block, "text", ""), "role": role})
input_messages.append(Message(content=_get_attr(block, "text", ""), role=str(role)))
Copy link
Contributor Author

@ncybul ncybul Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a type hint here that _get_attr(block, "text", "") is not necessarily a string. I tried coercing it with str() but that makes the test_tools_sync_stream fail because the expected content is a list. This makes me think I should update this line and the test to be consistent with the string format.

I am also confused though as to why this tool use is formatted as a text type in the anthropic stream to begin with though 🤔


elif _get_attr(block, "type", None) == "image":
# Store a placeholder for potentially enormous binary image data.
input_messages.append({"content": "([IMAGE DETECTED])", "role": role})
input_messages.append(Message(content="([IMAGE DETECTED])", role=str(role)))

elif _get_attr(block, "type", None) == "tool_use":
text = _get_attr(block, "text", None)
input_data = _get_attr(block, "input", "")
if isinstance(input_data, str):
input_data = json.loads(input_data)
tool_call_info = ToolCall(
name=_get_attr(block, "name", ""),
name=str(_get_attr(block, "name", "")),
arguments=input_data,
tool_id=_get_attr(block, "id", ""),
type=_get_attr(block, "type", ""),
tool_id=str(_get_attr(block, "id", "")),
type=str(_get_attr(block, "type", "")),
)
if text is None:
text = ""
input_messages.append({"content": text, "role": role, "tool_calls": [tool_call_info]})
input_messages.append(Message(content=str(text), role=str(role), tool_calls=[tool_call_info]))

elif _get_attr(block, "type", None) == "tool_result":
content = _get_attr(block, "content", None)
formatted_content = self._format_tool_result_content(content)
tool_result_info = ToolResult(
result=formatted_content,
tool_id=_get_attr(block, "tool_use_id", ""),
tool_id=str(_get_attr(block, "tool_use_id", "")),
type="tool_result",
)
input_messages.append({"content": "", "role": role, "tool_results": [tool_result_info]})
input_messages.append(Message(content="", role=str(role), tool_results=[tool_result_info]))
else:
input_messages.append({"content": str(block), "role": role})
input_messages.append(Message(content=str(block), role=str(role)))

return input_messages

Expand All @@ -169,34 +171,34 @@ def _format_tool_result_content(self, content) -> str:
return ",".join(formatted_content)
return str(content)

def _extract_output_message(self, response):
def _extract_output_message(self, response) -> List[Message]:
"""Extract output messages from the stored response."""
output_messages = []
output_messages: List[Message] = []
content = _get_attr(response, "content", "")
role = _get_attr(response, "role", "")

if isinstance(content, str):
return [{"content": content, "role": role}]
return [Message(content=content, role=str(role))]

elif isinstance(content, list):
for completion in content:
text = _get_attr(completion, "text", None)
if isinstance(text, str):
output_messages.append({"content": text, "role": role})
output_messages.append(Message(content=text, role=str(role)))
else:
if _get_attr(completion, "type", None) == "tool_use":
input_data = _get_attr(completion, "input", "")
if isinstance(input_data, str):
input_data = json.loads(input_data)
tool_call_info = ToolCall(
name=_get_attr(completion, "name", ""),
name=str(_get_attr(completion, "name", "")),
arguments=input_data,
tool_id=_get_attr(completion, "id", ""),
type=_get_attr(completion, "type", ""),
tool_id=str(_get_attr(completion, "id", "")),
type=str(_get_attr(completion, "type", "")),
)
if text is None:
text = ""
output_messages.append({"content": text, "role": role, "tool_calls": [tool_call_info]})
output_messages.append(Message(content=str(text), role=str(role), tool_calls=[tool_call_info]))
return output_messages

def _extract_usage(self, span: Span, usage: Dict[str, Any]):
Expand Down
45 changes: 23 additions & 22 deletions ddtrace/llmobs/_integrations/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from ddtrace.llmobs._telemetry import record_bedrock_agent_span_event_created
from ddtrace.llmobs._utils import _get_attr
from ddtrace.llmobs._writer import LLMObsSpanEvent
from ddtrace.llmobs.utils import ToolDefinition
from ddtrace.llmobs.utils import Message, ToolDefinition
from ddtrace.trace import Span


Expand Down Expand Up @@ -110,7 +110,7 @@ def _llmobs_set_tags(
self._extract_input_message_for_converse(prompt) if is_converse else self._extract_input_message(prompt)
)

output_messages = [{"content": ""}]
output_messages: List[Message] = [Message(content="")]
if not span.error and response is not None:
if ctx["resource"] == "Converse":
output_messages = self._extract_output_message_for_converse(response)
Expand Down Expand Up @@ -191,7 +191,7 @@ def translate_bedrock_traces(self, traces, root_span) -> None:
self._active_span_by_step_id.clear()

@staticmethod
def _extract_input_message_for_converse(prompt: List[Dict[str, Any]]):
def _extract_input_message_for_converse(prompt: List[Dict[str, Any]]) -> List[Message]:
"""Extract input messages from the stored prompt for converse

`prompt` is an array of `message` objects. Each `message` has a role and content field.
Expand All @@ -203,8 +203,8 @@ def _extract_input_message_for_converse(prompt: List[Dict[str, Any]]):
"""
if not isinstance(prompt, list):
log.warning("Bedrock input is not a list of messages or a string.")
return [{"content": ""}]
input_messages = []
return [Message(content="")]
input_messages: List[Message] = []
for message in prompt:
if not isinstance(message, dict):
continue
Expand All @@ -226,7 +226,7 @@ def _extract_output_message_for_converse(response: Dict[str, Any]):
For more info, see bedrock converse response syntax:
https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html#API_runtime_Converse_ResponseSyntax
"""
default_content = [{"content": ""}]
default_content: List[Message] = [Message(content="")]
message = response.get("output", {}).get("message", {})
if not message:
return default_content
Expand All @@ -241,7 +241,7 @@ def _converse_output_stream_processor() -> (
Generator[
None,
Dict[str, Any],
Tuple[List[Dict[str, Any]], Dict[str, str], Dict[str, int]],
Tuple[List[Message], Dict[str, str], Dict[str, int]],
]
):
"""
Expand All @@ -259,7 +259,7 @@ def _converse_output_stream_processor() -> (
"""
usage_metrics: Dict[str, int] = {}
metadata: Dict[str, str] = {}
messages: List[Dict[str, Any]] = []
messages: List[Message] = []

text_content_blocks: Dict[int, str] = {}
tool_content_blocks: Dict[int, Dict[str, Any]] = {}
Expand Down Expand Up @@ -336,47 +336,48 @@ def _converse_output_stream_processor() -> (
)

if not messages:
messages.append({"role": "assistant", "content": ""})
messages.append(Message(content="", role="assistant"))

normalize_input_tokens(usage_metrics)
return messages, metadata, usage_metrics

@staticmethod
def _extract_input_message(prompt):
def _extract_input_message(prompt) -> List[Message]:
"""Extract input messages from the stored prompt.
Anthropic allows for messages and multiple texts in a message, which requires some special casing.
"""
if isinstance(prompt, str):
return [{"content": prompt}]
return [Message(content=prompt)]
if not isinstance(prompt, list):
log.warning("Bedrock input is not a list of messages or a string.")
return [{"content": ""}]
input_messages = []
return [Message(content="")]
input_messages: List[Message] = []
for p in prompt:
content = p.get("content", "")
if isinstance(content, list) and isinstance(content[0], dict):
for entry in content:
if entry.get("type") == "text":
input_messages.append({"content": entry.get("text", ""), "role": str(p.get("role", ""))})
input_messages.append(Message(content=entry.get("text", ""), role=str(p.get("role", ""))))
elif entry.get("type") == "image":
# Store a placeholder for potentially enormous binary image data.
input_messages.append({"content": "([IMAGE DETECTED])", "role": str(p.get("role", ""))})
input_messages.append(Message(content="([IMAGE DETECTED])", role=str(p.get("role", ""))))
else:
input_messages.append({"content": content, "role": str(p.get("role", ""))})
input_messages.append(Message(content=str(content), role=str(p.get("role", ""))))
return input_messages

@staticmethod
def _extract_output_message(response):
def _extract_output_message(response) -> List[Message]:
"""Extract output messages from the stored response.
Anthropic allows for chat messages, which requires some special casing.
"""
if isinstance(response["text"], str):
return [{"content": response["text"]}]
return [Message(content=response["text"])]
if isinstance(response["text"], list):
if isinstance(response["text"][0], str):
return [{"content": str(content)} for content in response["text"]]
return [Message(content=str(content)) for content in response["text"]]
if isinstance(response["text"][0], dict):
return [{"content": response["text"][0].get("text", "")}]
return [Message(content=response["text"][0].get("text", ""))]
return []

def _get_base_url(self, **kwargs: Dict[str, Any]) -> Optional[str]:
instance = kwargs.get("instance")
Expand All @@ -396,8 +397,8 @@ def _extract_tool_definitions(self, tool_config: Dict[str, Any]) -> List[ToolDef
for tool in tools:
tool_spec = _get_attr(tool, "toolSpec", {})
tool_definition_info = ToolDefinition(
name=_get_attr(tool_spec, "name", ""),
description=_get_attr(tool_spec, "description", ""),
name=str(_get_attr(tool_spec, "name", "")),
description=str(_get_attr(tool_spec, "description", "")),
schema=_get_attr(tool_spec, "inputSchema", {}),
)
tool_definitions.append(tool_definition_info)
Expand Down
Loading
Loading