-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add metadata to the Agent class. #3370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -356,3 +356,20 @@ Agent.instrument_all(instrumentation_settings) | |
| ``` | ||
|
|
||
| This setting is particularly useful in production environments where compliance requirements or data sensitivity concerns make it necessary to limit what content is sent to your observability platform. | ||
|
|
||
| ### Adding Custom Metadata | ||
|
|
||
| Use the agent's `metadata` parameter to attach additional data to the agent's span. | ||
| Metadata can be provided as a string, a dictionary, or a callable that reads the [`RunContext`][pydantic_ai.tools.RunContext] to compute values on each run. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer to only support |
||
|
|
||
| ```python {hl_lines="4-5"} | ||
| from pydantic_ai import Agent | ||
|
|
||
| agent = Agent( | ||
| 'openai:gpt-5', | ||
| instrument=True, | ||
| metadata=lambda ctx: {'deployment': 'staging', 'tenant': ctx.deps.tenant}, | ||
| ) | ||
| ``` | ||
|
|
||
| When instrumentation is enabled, the resolved metadata is recorded on the agent span under the `logfire.agent.metadata` attribute. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may still change my mind on this, but I don't think it needs the |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| HistoryProcessor, | ||
| ModelRequestNode, | ||
| UserPromptNode, | ||
| build_run_context, | ||
| capture_run_messages, | ||
| ) | ||
| from .._output import OutputToolset | ||
|
|
@@ -89,6 +90,8 @@ | |
| S = TypeVar('S') | ||
| NoneType = type(None) | ||
|
|
||
| AgentMetadataValue = str | dict[str, str] | Callable[[RunContext[AgentDepsT]], str | dict[str, str]] | ||
DouweM marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| @dataclasses.dataclass(init=False) | ||
| class Agent(AbstractAgent[AgentDepsT, OutputDataT]): | ||
|
|
@@ -130,6 +133,7 @@ class Agent(AbstractAgent[AgentDepsT, OutputDataT]): | |
| """Options to automatically instrument with OpenTelemetry.""" | ||
|
|
||
| _instrument_default: ClassVar[InstrumentationSettings | bool] = False | ||
| _metadata: AgentMetadataValue[AgentDepsT] | None = dataclasses.field(repr=False) | ||
|
|
||
| _deps_type: type[AgentDepsT] = dataclasses.field(repr=False) | ||
| _output_schema: _output.OutputSchema[OutputDataT] = dataclasses.field(repr=False) | ||
|
|
@@ -175,6 +179,7 @@ def __init__( | |
| defer_model_check: bool = False, | ||
| end_strategy: EndStrategy = 'early', | ||
| instrument: InstrumentationSettings | bool | None = None, | ||
| metadata: AgentMetadataValue[AgentDepsT] | None = None, | ||
| history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None, | ||
| event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, | ||
| ) -> None: ... | ||
|
|
@@ -201,6 +206,7 @@ def __init__( | |
| defer_model_check: bool = False, | ||
| end_strategy: EndStrategy = 'early', | ||
| instrument: InstrumentationSettings | bool | None = None, | ||
| metadata: AgentMetadataValue[AgentDepsT] | None = None, | ||
| history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None, | ||
| event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, | ||
| ) -> None: ... | ||
|
|
@@ -225,6 +231,7 @@ def __init__( | |
| defer_model_check: bool = False, | ||
| end_strategy: EndStrategy = 'early', | ||
| instrument: InstrumentationSettings | bool | None = None, | ||
| metadata: AgentMetadataValue[AgentDepsT] | None = None, | ||
| history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None, | ||
| event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, | ||
| **_deprecated_kwargs: Any, | ||
|
|
@@ -276,6 +283,10 @@ def __init__( | |
| [`Agent.instrument_all()`][pydantic_ai.Agent.instrument_all] | ||
| will be used, which defaults to False. | ||
| See the [Debugging and Monitoring guide](https://ai.pydantic.dev/logfire/) for more info. | ||
| metadata: Optional metadata to attach to telemetry for this agent. | ||
| Provide a string literal, a dict of string keys and values, or a callable returning one of those values | ||
| computed from the [`RunContext`][pydantic_ai.tools.RunContext] on each run. | ||
| Metadata is only recorded when instrumentation is enabled. | ||
| history_processors: Optional list of callables to process the message history before sending it to the model. | ||
| Each processor takes a list of messages and returns a modified list of messages. | ||
| Processors can be sync or async and are applied in sequence. | ||
|
|
@@ -292,6 +303,7 @@ def __init__( | |
|
|
||
| self._output_type = output_type | ||
| self.instrument = instrument | ||
| self._metadata = metadata | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about the sunder, and that it's not represented in repr. I went by feeling. |
||
| self._deps_type = deps_type | ||
|
|
||
| if mcp_servers := _deprecated_kwargs.pop('mcp_servers', None): | ||
|
|
@@ -349,6 +361,9 @@ def __init__( | |
| self._override_instructions: ContextVar[ | ||
| _utils.Option[list[str | _system_prompt.SystemPromptFunc[AgentDepsT]]] | ||
| ] = ContextVar('_override_instructions', default=None) | ||
| self._override_metadata: ContextVar[_utils.Option[AgentMetadataValue[AgentDepsT]]] = ContextVar( | ||
| '_override_metadata', default=None | ||
| ) | ||
|
|
||
| self._enter_lock = Lock() | ||
| self._entered_count = 0 | ||
|
|
@@ -645,6 +660,7 @@ async def get_instructions(run_context: RunContext[AgentDepsT]) -> str | None: | |
| }, | ||
| ) | ||
|
|
||
| run_metadata: str | dict[str, str] | None = None | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could technically set a non-callable metadata even if the run fails, and the ctx isn't available. What do you think? I opted for the simpler implementation.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the run fails, wouldn't we still have We should also document explicitly whether the callable is executed at the start or end of the run, as RunContext would change (e.g. messages, usage); I think at the end is best. |
||
| try: | ||
| async with graph.iter( | ||
| inputs=user_prompt_node, | ||
|
|
@@ -656,8 +672,10 @@ async def get_instructions(run_context: RunContext[AgentDepsT]) -> str | None: | |
| async with toolset: | ||
| agent_run = AgentRun(graph_run) | ||
| yield agent_run | ||
| if (final_result := agent_run.result) is not None and run_span.is_recording(): | ||
| if instrumentation_settings and instrumentation_settings.include_content: | ||
| final_result = agent_run.result | ||
| if instrumentation_settings and run_span.is_recording(): | ||
| run_metadata = self._compute_agent_metadata(build_run_context(agent_run.ctx)) | ||
| if instrumentation_settings.include_content and final_result is not None: | ||
| run_span.set_attribute( | ||
| 'final_result', | ||
| ( | ||
|
|
@@ -671,18 +689,32 @@ async def get_instructions(run_context: RunContext[AgentDepsT]) -> str | None: | |
| if instrumentation_settings and run_span.is_recording(): | ||
| run_span.set_attributes( | ||
| self._run_span_end_attributes( | ||
| instrumentation_settings, usage, state.message_history, graph_deps.new_message_index | ||
| instrumentation_settings, | ||
| usage, | ||
| state.message_history, | ||
| graph_deps.new_message_index, | ||
| run_metadata, | ||
| ) | ||
| ) | ||
| finally: | ||
| run_span.end() | ||
|
|
||
| def _compute_agent_metadata(self, ctx: RunContext[AgentDepsT]) -> str | dict[str, str] | None: | ||
| metadata_override = self._override_metadata.get() | ||
| metadata_config = metadata_override.value if metadata_override is not None else self._metadata | ||
| if metadata_config is None: | ||
| return None | ||
|
|
||
| metadata = metadata_config(ctx) if callable(metadata_config) else metadata_config | ||
| return metadata | ||
|
|
||
| def _run_span_end_attributes( | ||
| self, | ||
| settings: InstrumentationSettings, | ||
| usage: _usage.RunUsage, | ||
| message_history: list[_messages.ModelMessage], | ||
| new_message_index: int, | ||
| metadata: str | dict[str, str] | None = None, | ||
| ): | ||
| if settings.version == 1: | ||
| attrs = { | ||
|
|
@@ -716,6 +748,12 @@ def _run_span_end_attributes( | |
| ): | ||
| attrs['pydantic_ai.variable_instructions'] = True | ||
|
|
||
| if metadata is not None: | ||
| if isinstance(metadata, dict): | ||
| attrs['logfire.agent.metadata'] = json.dumps(metadata) | ||
| else: | ||
| attrs['logfire.agent.metadata'] = metadata | ||
|
|
||
| return { | ||
| **usage.opentelemetry_attributes(), | ||
| **attrs, | ||
|
|
@@ -740,6 +778,7 @@ def override( | |
| toolsets: Sequence[AbstractToolset[AgentDepsT]] | _utils.Unset = _utils.UNSET, | ||
| tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | _utils.Unset = _utils.UNSET, | ||
| instructions: Instructions[AgentDepsT] | _utils.Unset = _utils.UNSET, | ||
| metadata: AgentMetadataValue[AgentDepsT] | _utils.Unset = _utils.UNSET, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're doing |
||
| ) -> Iterator[None]: | ||
| """Context manager to temporarily override agent name, dependencies, model, toolsets, tools, or instructions. | ||
|
|
||
|
|
@@ -753,6 +792,7 @@ def override( | |
| toolsets: The toolsets to use instead of the toolsets passed to the agent constructor and agent run. | ||
| tools: The tools to use instead of the tools registered with the agent. | ||
| instructions: The instructions to use instead of the instructions registered with the agent. | ||
| metadata: The metadata to use instead of the metadata passed to the agent constructor. | ||
| """ | ||
| if _utils.is_set(name): | ||
| name_token = self._override_name.set(_utils.Some(name)) | ||
|
|
@@ -785,6 +825,11 @@ def override( | |
| else: | ||
| instructions_token = None | ||
|
|
||
| if _utils.is_set(metadata): | ||
| metadata_token = self._override_metadata.set(_utils.Some(metadata)) | ||
| else: | ||
| metadata_token = None | ||
|
|
||
| try: | ||
| yield | ||
| finally: | ||
|
|
@@ -800,6 +845,8 @@ def override( | |
| self._override_tools.reset(tools_token) | ||
| if instructions_token is not None: | ||
| self._override_instructions.reset(instructions_token) | ||
| if metadata_token is not None: | ||
| self._override_metadata.reset(metadata_token) | ||
|
|
||
| @overload | ||
| def instructions( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now this is very instrumentation specific, although the name intentionally isn't. I think we should also make the resulting metadata available on
AgentRunResult(and the other classes that have arun_idfield) so the user can read/store it after the fact.