1+ import json
2+ from datetime import timezone
13from pathlib import Path
2- from typing import Any , cast , overload
4+ from typing import Any , overload
35
6+ from key_value .shared .errors import DeserializationError
47from key_value .shared .utils .managed_entry import ManagedEntry
8+ from key_value .shared .utils .serialization import SerializationAdapter
59from typing_extensions import override
610
711from key_value .aio .stores .base import SEED_DATA_TYPE , BaseContextManagerStore , BaseStore
1317 raise ImportError (msg ) from e
1418
1519
20+ class DuckDBSerializationAdapter (SerializationAdapter ):
21+ """Adapter for DuckDB with support for native JSON and TEXT storage modes."""
22+
23+ _native_storage : bool
24+ _value_column : str
25+
26+ def __init__ (self , * , native_storage : bool = True ) -> None :
27+ """Initialize the DuckDB adapter.
28+
29+ Args:
30+ native_storage: If True, use JSON column for native dict storage.
31+ If False, use TEXT column for stringified JSON.
32+ """
33+ super ().__init__ ()
34+
35+ self ._native_storage = native_storage
36+ self ._date_format = "datetime"
37+ # Always use string format - DuckDB needs JSON strings for both TEXT and JSON columns
38+ self ._value_format = "string"
39+ self ._value_column = "value_dict" if native_storage else "value_json"
40+
41+ @override
42+ def prepare_dump (self , data : dict [str , Any ]) -> dict [str , Any ]:
43+ """Prepare data for dumping to DuckDB.
44+
45+ Moves the value to the appropriate column (value_dict or value_json)
46+ and sets the other column to None.
47+ """
48+ value = data .pop ("value" )
49+
50+ # Set both columns to None, then populate the appropriate one
51+ data ["value_json" ] = None
52+ data ["value_dict" ] = None
53+
54+ if self ._native_storage :
55+ # For native storage, we pass the JSON string to DuckDB's JSON column
56+ # DuckDB will parse it and store it as native JSON
57+ data ["value_dict" ] = value
58+ else :
59+ # For TEXT storage, value should be a JSON string
60+ data ["value_json" ] = value
61+
62+ return data
63+
64+ @override
65+ def prepare_load (self , data : dict [str , Any ]) -> dict [str , Any ]:
66+ """Prepare data loaded from DuckDB for conversion to ManagedEntry.
67+
68+ Extracts value from the appropriate column and handles timezone conversion
69+ for DuckDB's naive timestamps.
70+ """
71+ value_json = data .pop ("value_json" , None )
72+ value_dict = data .pop ("value_dict" , None )
73+
74+ # Determine which value column to use (prefer value_dict if present)
75+ if value_dict is not None :
76+ # Native storage mode - value_dict can be dict or string (DuckDB JSON returns as string)
77+ if isinstance (value_dict , dict ):
78+ data ["value" ] = value_dict
79+ elif isinstance (value_dict , str ):
80+ # DuckDB sometimes returns JSON as string, parse it
81+ data ["value" ] = json .loads (value_dict )
82+ else :
83+ msg = f"value_dict has unexpected type: { type (value_dict )} "
84+ raise DeserializationError (message = msg )
85+ elif value_json is not None :
86+ # Stringified JSON mode - parse from string
87+ if isinstance (value_json , str ):
88+ data ["value" ] = json .loads (value_json )
89+ else :
90+ msg = f"value_json has unexpected type: { type (value_json )} "
91+ raise DeserializationError (message = msg )
92+ else :
93+ msg = "Neither value_dict nor value_json column contains data"
94+ raise DeserializationError (message = msg )
95+
96+ # DuckDB always returns naive timestamps, but ManagedEntry expects timezone-aware ones
97+ # Convert to timezone-aware UTC timestamps. Handle None values explicitly.
98+ created_at = data .get ("created_at" )
99+ if created_at is not None and created_at .tzinfo is None :
100+ data ["created_at" ] = created_at .replace (tzinfo = timezone .utc )
101+
102+ expires_at = data .get ("expires_at" )
103+ if expires_at is not None and expires_at .tzinfo is None :
104+ data ["expires_at" ] = expires_at .replace (tzinfo = timezone .utc )
105+
106+ return data
107+
108+
16109class DuckDBStore (BaseContextManagerStore , BaseStore ):
17110 """A DuckDB-based key-value store supporting both in-memory and persistent storage.
18111
@@ -35,7 +128,7 @@ class DuckDBStore(BaseContextManagerStore, BaseStore):
35128 _connection : duckdb .DuckDBPyConnection
36129 _is_closed : bool
37130 _owns_connection : bool
38- _native_storage : bool
131+ _adapter : SerializationAdapter
39132 _table_name : str
40133
41134 @overload
@@ -125,7 +218,7 @@ def __init__(
125218 self ._owns_connection = True
126219
127220 self ._is_closed = False
128- self ._native_storage = native_storage
221+ self ._adapter = DuckDBSerializationAdapter ( native_storage = native_storage )
129222 self ._table_name = table_name
130223 self ._stable_api = False
131224
@@ -239,8 +332,8 @@ async def _setup(self) -> None:
239332 async def _get_managed_entry (self , * , key : str , collection : str ) -> ManagedEntry | None :
240333 """Retrieve a managed entry by key from the specified collection.
241334
242- Reconstructs the ManagedEntry from value columns and metadata columns.
243- Tries value_dict first (native storage), falls back to value_json (stringified) .
335+ Reconstructs the ManagedEntry from value columns and metadata columns
336+ using the serialization adapter .
244337 """
245338 if self ._is_closed :
246339 msg = "Cannot operate on closed DuckDBStore"
@@ -254,45 +347,23 @@ async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry
254347 if result is None :
255348 return None
256349
257- value_json , value_dict , created_at , ttl , expires_at = result
350+ value_json , value_dict , created_at , _ttl , expires_at = result
258351
259- # Determine which value column to use (prefer value_dict if present)
260- import json
261-
262- value : dict [str , Any ]
263- if value_dict is not None :
264- # Native storage mode - value_dict can be dict or string (DuckDB JSON returns as string)
265- if isinstance (value_dict , dict ):
266- value = cast (dict [str , Any ], value_dict )
267- elif isinstance (value_dict , str ):
268- # DuckDB sometimes returns JSON as string
269- value = json .loads (value_dict )
270- else :
271- msg = f"value_dict has unexpected type: { type (value_dict )} "
272- raise TypeError (msg )
273- elif value_json is not None :
274- # Stringified JSON mode - parse from string
275- value = json .loads (value_json )
276- else :
277- # Neither column has data - this shouldn't happen
278- return None
352+ # Build document dict for the adapter (exclude None values)
353+ document : dict [str , Any ] = {
354+ "value_json" : value_json ,
355+ "value_dict" : value_dict ,
356+ }
279357
280- # DuckDB always returns naive timestamps, but ManagedEntry expects timezone-aware ones
281- # Convert to timezone-aware UTC timestamps
282- from datetime import timezone
358+ if created_at is not None :
359+ document ["created_at" ] = created_at
360+ if expires_at is not None :
361+ document ["expires_at" ] = expires_at
283362
284- if created_at is not None and created_at .tzinfo is None :
285- created_at = created_at .replace (tzinfo = timezone .utc )
286- if expires_at is not None and expires_at .tzinfo is None :
287- expires_at = expires_at .replace (tzinfo = timezone .utc )
288-
289- # Reconstruct ManagedEntry with metadata from columns
290- return ManagedEntry (
291- value = value ,
292- created_at = created_at ,
293- ttl = ttl ,
294- expires_at = expires_at ,
295- )
363+ try :
364+ return self ._adapter .load_dict (data = document )
365+ except DeserializationError :
366+ return None
296367
297368 @override
298369 async def _put_managed_entry (
@@ -304,48 +375,27 @@ async def _put_managed_entry(
304375 ) -> None :
305376 """Store a managed entry by key in the specified collection.
306377
307- Stores the value and metadata separately:
308- - value_json/value_dict: Stores value based on native_storage setting
309- - created_at, ttl, expires_at: Stored in native columns for efficient querying
378+ Uses the serialization adapter to convert the ManagedEntry to the
379+ appropriate storage format.
310380 """
311381 if self ._is_closed :
312382 msg = "Cannot operate on closed DuckDBStore"
313383 raise RuntimeError (msg )
314384
315- # Store in appropriate column based on native_storage setting
316- value_json : str | None = None
317- value_dict : str | None = None
318-
319- if self ._native_storage :
320- # Native storage: store as JSON string in JSON column (DuckDB will handle as JSON type)
321- # We use value_as_json to ensure serialization errors are caught
322- value_dict = managed_entry .value_as_json
323- else :
324- # Stringified storage: store JSON string in TEXT column
325- value_json = managed_entry .value_as_json
326-
327- # Ensure timestamps are timezone-aware (convert naive to UTC if needed)
328- from datetime import timezone
329-
330- created_at = managed_entry .created_at
331- if created_at is not None and created_at .tzinfo is None :
332- created_at = created_at .replace (tzinfo = timezone .utc )
333-
334- expires_at = managed_entry .expires_at
335- if expires_at is not None and expires_at .tzinfo is None :
336- expires_at = expires_at .replace (tzinfo = timezone .utc )
385+ # Use adapter to dump the managed entry to a dict
386+ document = self ._adapter .dump_dict (entry = managed_entry , exclude_none = False )
337387
338388 # Insert or replace the entry with metadata in separate columns
339389 self ._connection .execute (
340390 self ._get_insert_sql (),
341391 [
342392 collection ,
343393 key ,
344- value_json ,
345- value_dict ,
346- created_at ,
394+ document [ " value_json" ] ,
395+ document [ " value_dict" ] ,
396+ document . get ( " created_at" ) ,
347397 managed_entry .ttl ,
348- expires_at ,
398+ document . get ( " expires_at" ) ,
349399 ],
350400 )
351401
0 commit comments