|
| 1 | +# Durable Execution with DBOS |
| 2 | + |
| 3 | +[DBOS](https://www.dbos.dev/) is a lightweight [durable execution](https://docs.dbos.dev/architecture) library natively integrated with Pydantic AI. |
| 4 | + |
| 5 | +## Durable Execution |
| 6 | + |
| 7 | +DBOS workflows make your program **durable** by checkpointing its state in a database. If your program ever fails, when it restarts all your workflows will automatically resume from the last completed step. |
| 8 | + |
| 9 | +* **Workflows** must be deterministic and generally cannot include I/O. |
| 10 | +* **Steps** may perform I/O (network, disk, API calls). If a step fails, it restarts from the beginning. |
| 11 | + |
| 12 | +Every workflow input and step output is durably stored in the system database. When workflow execution fails, whether from crashes, network issues, or server restarts, DBOS leverages these checkpoints to recover workflows from their last completed step. |
| 13 | + |
| 14 | +DBOS **queues** provide durable, database-backed alternatives to systems like Celery or BullMQ, supporting features such as concurrency limits, rate limits, timeouts, and prioritization. See the [DBOS docs](https://docs.dbos.dev/architecture) for details. |
| 15 | + |
| 16 | +The diagram below shows the overall architecture of an agentic application in DBOS. |
| 17 | +DBOS runs fully in-process as a library. Functions remain normal Python functions but are checkpointed into a database (Postgres or SQLite). |
| 18 | + |
| 19 | +```text |
| 20 | + Clients |
| 21 | + (HTTP, RPC, Kafka, etc.) |
| 22 | + | |
| 23 | + v |
| 24 | ++------------------------------------------------------+ |
| 25 | +| Application Servers | |
| 26 | +| | |
| 27 | +| +----------------------------------------------+ | |
| 28 | +| | Pydantic AI + DBOS Libraries | | |
| 29 | +| | | | |
| 30 | +| | [ Workflows (Agent Run Loop) ] | | |
| 31 | +| | [ Steps (Tool, MCP, Model) ] | | |
| 32 | +| | [ Queues ] [ Cron Jobs ] [ Messaging ] | | |
| 33 | +| +----------------------------------------------+ | |
| 34 | +| | |
| 35 | ++------------------------------------------------------+ |
| 36 | + | |
| 37 | + v |
| 38 | ++------------------------------------------------------+ |
| 39 | +| Database | |
| 40 | +| (Stores workflow and step state, schedules tasks) | |
| 41 | ++------------------------------------------------------+ |
| 42 | +``` |
| 43 | + |
| 44 | +See the [DBOS documentation](https://docs.dbos.dev/architecture) for more information. |
| 45 | + |
| 46 | +## Durable Agent |
| 47 | + |
| 48 | +Any agent can be wrapped in a [`DBOSAgent`][pydantic_ai.durable_exec.dbos.DBOSAgent] to get durable execution. `DBOSAgent` automatically:, |
| 49 | + |
| 50 | +* Wraps `Agent.run` and `Agent.run_sync` as DBOS workflows. |
| 51 | +* Wraps [model requests](../models/overview.md) and [MCP communication](../mcp/client.md) as DBOS steps. |
| 52 | + |
| 53 | +Custom tool functions and event stream handlers are **not automatically wrapped** by DBOS. |
| 54 | +If they involve non-deterministic behavior or perform I/O, you should explicitly decorate them with `@DBOS.step`. |
| 55 | + |
| 56 | +The original agent, model, and MCP server can still be used as normal outside the DBOS workflow. |
| 57 | + |
| 58 | +Here is a simple but complete example of wrapping an agent for durable execution. All it requires is to install Pydantic AI with the DBOS [open-source library](https://github.com/dbos-inc/dbos-transact-py): |
| 59 | + |
| 60 | +```bash |
| 61 | +pip/uv-add pydantic-ai[dbos] |
| 62 | +``` |
| 63 | + |
| 64 | +Or if you're using the slim package, you can install it with the `dbos` optional group: |
| 65 | + |
| 66 | +```bash |
| 67 | +pip/uv-add pydantic-ai-slim[dbos] |
| 68 | +``` |
| 69 | + |
| 70 | +```python {title="dbos_agent.py" test="skip"} |
| 71 | +from dbos import DBOS, DBOSConfig |
| 72 | + |
| 73 | +from pydantic_ai import Agent |
| 74 | +from pydantic_ai.durable_exec.dbos import DBOSAgent |
| 75 | + |
| 76 | +dbos_config: DBOSConfig = { |
| 77 | + 'name': 'pydantic_dbos_agent', |
| 78 | + 'system_database_url': 'sqlite:///dbostest.sqlite', # (3)! |
| 79 | +} |
| 80 | +DBOS(config=dbos_config) |
| 81 | + |
| 82 | +agent = Agent( |
| 83 | + 'gpt-5', |
| 84 | + instructions="You're an expert in geography.", |
| 85 | + name='geography', # (4)! |
| 86 | +) |
| 87 | + |
| 88 | +dbos_agent = DBOSAgent(agent) # (1)! |
| 89 | + |
| 90 | +async def main(): |
| 91 | + DBOS.launch() |
| 92 | + result = await dbos_agent.run('What is the capital of Mexico?') # (2)! |
| 93 | + print(result.output) |
| 94 | + #> Mexico City (Ciudad de México, CDMX) |
| 95 | +``` |
| 96 | + |
| 97 | +1. Workflows and `DBOSAgent` must be defined before `DBOS.launch()` so that recovery can correctly find all workflows. |
| 98 | +2. [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run] works like [`Agent.run()`][pydantic_ai.Agent.run], but runs as a DBOS workflow and executes model requests, decorated tool calls, and MCP communication as DBOS steps. |
| 99 | +3. This example uses SQLite. Postgres is recommended for production. |
| 100 | +4. The agent's `name` is used to uniquely identify its workflows. |
| 101 | + |
| 102 | +_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_ |
| 103 | + |
| 104 | +Because DBOS workflows need to be defined before calling `DBOS.launch()` and the `DBOSAgent` instance automatically registers `run` and `run_sync` as workflows, it needs to be defined before calling `DBOS.launch()` as well. |
| 105 | + |
| 106 | +For more information on how to use DBOS in Python applications, see their [Python SDK guide](https://docs.dbos.dev/python/programming-guide). |
| 107 | + |
| 108 | +## DBOS Integration Considerations |
| 109 | + |
| 110 | +When using DBOS with Pydantic AI agents, there are a few important considerations to ensure workflows and toolsets behave correctly. |
| 111 | + |
| 112 | +### Agent and Toolset Requirements |
| 113 | + |
| 114 | +Each agent instance must have a unique `name` so DBOS can correctly resume workflows after a failure or restart. |
| 115 | + |
| 116 | +Tools and event stream handlers are not automatically wrapped by DBOS. You can decide how to integrate them: |
| 117 | + |
| 118 | +* Decorate with `@DBOS.step` if the function involves non-determinism or I/O. |
| 119 | +* Skip the decorator if durability isn't needed, so you avoid the extra DB checkpoint write. |
| 120 | +* If the function needs to enqueue tasks or invoke other DBOS workflows, run it inside the agent's main workflow (not as a step). |
| 121 | + |
| 122 | +Other than that, any agent and toolset will just work! |
| 123 | + |
| 124 | +### Agent Run Context and Dependencies |
| 125 | + |
| 126 | +DBOS checkpoints workflow inputs/outputs and step outputs into a database using `jsonpickle`. This means you need to make sure [dependencies](../dependencies.md) object provided to [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run] or [`DBOSAgent.run_sync()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run_sync], and tool outputs can be serialized using jsonpickle. You may also want to keep the inputs and outputs small (under \~2 MB). PostgreSQL and SQLite support up to 1 GB per field, but large objects may impact performance. |
| 127 | + |
| 128 | +### Streaming |
| 129 | + |
| 130 | +Because DBOS cannot stream output directly to the workflow or step call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] is not supported when running inside of a DBOS workflow. |
| 131 | + |
| 132 | +Instead, you can implement streaming by setting an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `DBOSAgent` instance and using [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run]. |
| 133 | +The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events). |
| 134 | + |
| 135 | + |
| 136 | +## Step Configuration |
| 137 | + |
| 138 | +You can customize DBOS step behavior, such as retries, by passing [`StepConfig`][pydantic_ai.durable_exec.dbos.StepConfig] objects to the `DBOSAgent` constructor: |
| 139 | + |
| 140 | +- `mcp_step_config`: The DBOS step config to use for MCP server communication. No retries if omitted. |
| 141 | +- `model_step_config`: The DBOS step config to use for model request steps. No retries if omitted. |
| 142 | + |
| 143 | +For custom tools, you can annotate them directly with [`@DBOS.step`](https://docs.dbos.dev/python/reference/decorators#step) or [`@DBOS.workflow`](https://docs.dbos.dev/python/reference/decorators#workflow) decorators as needed. These decorators have no effect outside DBOS workflows, so tools remain usable in non-DBOS agents. |
| 144 | + |
| 145 | + |
| 146 | +## Step Retries |
| 147 | + |
| 148 | +On top of the automatic retries for request failures that DBOS will perform, Pydantic AI and various provider API clients also have their own request retry logic. Enabling these at the same time may cause the request to be retried more often than expected, with improper `Retry-After` handling. |
| 149 | + |
| 150 | +When using DBOS, it's recommended to not use [HTTP Request Retries](../retries.md) and to turn off your provider API client's own retry logic, for example by setting `max_retries=0` on a [custom `OpenAIProvider` API client](../models/openai.md#custom-openai-client). |
| 151 | + |
| 152 | +You can customize DBOS's retry policy using [step configuration](#step-configuration). |
| 153 | + |
| 154 | +## Observability with Logfire |
| 155 | + |
| 156 | +When using [Pydantic Logfire](../logfire.md), we **recommend disabling DBOS's built-in OpenTelemetry tracing**. |
| 157 | +DBOS automatically wraps workflow and step execution in spans, while Pydantic AI and Logfire already emit spans for the same function calls, model requests, and tool invocations. Without disabling DBOS tracing, these operations may appear twice in your trace tree. |
| 158 | + |
| 159 | +To disable DBOS traces and logs, you can set `disable_otlp=True` in `DBOSConfig`. For example: |
| 160 | + |
| 161 | + |
| 162 | +```python {title="dbos_no_traces.py" test="skip"} |
| 163 | +from dbos import DBOS, DBOSConfig |
| 164 | + |
| 165 | +dbos_config: DBOSConfig = { |
| 166 | + 'name': 'pydantic_dbos_agent', |
| 167 | + 'system_database_url': 'sqlite:///dbostest.sqlite', |
| 168 | + 'disable_otlp': True # (1)! |
| 169 | +} |
| 170 | +DBOS(config=dbos_config) |
| 171 | +``` |
| 172 | + |
| 173 | +1. If `True`, disables OpenTelemetry tracing and logging for DBOS. Default is `False`. |
0 commit comments