Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
eea032b
Implement client ownership split for store lifecycle management
github-actions[bot] Nov 10, 2025
c6c6bea
Merge branch 'main' into claude/issue-232-20251110-1652
strawgate Nov 11, 2025
b08040b
refactor: simplify client ownership by initializing flag in base class
github-actions[bot] Nov 11, 2025
bb365a3
refactor: pass client_provided_by_user and stable_api as constructor …
github-actions[bot] Nov 12, 2025
8a62fe1
refactor: remove redundant ownership checks from _close() methods and…
github-actions[bot] Nov 12, 2025
88ae872
fix: address CodeRabbit PR feedback on lifecycle management
github-actions[bot] Nov 12, 2025
0f91cf0
fix: address CodeRabbit feedback on MongoDB/DynamoDB lifecycle manage…
github-actions[bot] Nov 12, 2025
2a3df53
refactor: use flag-based tracking instead of nullifying client
github-actions[bot] Nov 12, 2025
1fb274d
fix: prevent double-close by resetting flag after cleanup completes
github-actions[bot] Nov 12, 2025
cb76041
Merge branch 'main' into claude/issue-232-20251110-1652
strawgate Nov 12, 2025
d60d466
refactor: move client context management to base class
github-actions[bot] Nov 12, 2025
ab49e98
refactor: use exit stacks for client lifecycle management
github-actions[bot] Nov 13, 2025
3b465db
refactor: stores register cleanup callbacks directly with exit stack
github-actions[bot] Nov 13, 2025
b7d0949
refactor: stores register cleanup callbacks directly with exit stack
github-actions[bot] Nov 13, 2025
2fdb360
Merge branch 'main' into claude/issue-232-20251110-1652
strawgate Nov 16, 2025
d056841
Update stores for context mgr changes
strawgate Nov 16, 2025
6686ba2
Make missing rocksdb dir
strawgate Nov 16, 2025
348aabe
Updates for rocksdb tests
strawgate Nov 16, 2025
e96b709
Additional PR feedback
strawgate Nov 16, 2025
bfd3d8d
Fix type checking
strawgate Nov 16, 2025
3669ae2
Small rocksdb test changes
strawgate Nov 16, 2025
895072c
Duckdb test updates
strawgate Nov 16, 2025
501740d
Dont use test.db as duckdb db path
strawgate Nov 16, 2025
a20cfd7
duckdb test fixes
strawgate Nov 16, 2025
3e7c010
duckdb test changes
strawgate Nov 16, 2025
201ebfd
more duckdb test fixes
strawgate Nov 16, 2025
4bb78cf
Try to get Windows tests passing
strawgate Nov 16, 2025
8f05cc9
Make temp directory earlier in the fixture stack
strawgate Nov 16, 2025
15a6146
Move temp dir to base store tests
strawgate Nov 16, 2025
7942382
Ignore cleanup errors on Windows
strawgate Nov 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 63 additions & 10 deletions key-value/key-value-aio/src/key_value/aio/stores/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from asyncio.locks import Lock
from collections import defaultdict
from collections.abc import Mapping, Sequence
from contextlib import AsyncExitStack
from datetime import datetime
from types import MappingProxyType, TracebackType
from typing import Any, SupportsFloat
Expand Down Expand Up @@ -85,6 +86,7 @@ def __init__(
collection_sanitization_strategy: SanitizationStrategy | None = None,
default_collection: str | None = None,
seed: SEED_DATA_TYPE | None = None,
stable_api: bool = False,
) -> None:
"""Initialize the managed key-value store.

Expand All @@ -97,6 +99,8 @@ def __init__(
seed: Optional seed data to pre-populate the store. Format: {collection: {key: {field: value, ...}}}.
Seeding occurs once during store initialization (when the store is first entered or when the
first operation is performed on the store).
stable_api: Whether this store implementation has a stable API. If False, a warning will be issued.
Defaults to False.
"""

self._setup_complete = False
Expand All @@ -113,8 +117,7 @@ def __init__(
self._key_sanitization_strategy = key_sanitization_strategy or PassthroughStrategy()
self._collection_sanitization_strategy = collection_sanitization_strategy or PassthroughStrategy()

if not hasattr(self, "_stable_api"):
self._stable_api = False
self._stable_api = stable_api

if not self._stable_api:
self._warn_about_stability()
Expand Down Expand Up @@ -425,24 +428,74 @@ async def _get_collection_keys(self, *, collection: str, limit: int | None = Non


class BaseContextManagerStore(BaseStore, ABC):
"""An abstract base class for context manager stores."""
"""An abstract base class for context manager stores.

Stores that accept a client parameter should pass `client_provided_by_user=True` to
the constructor. This ensures the store does not manage the lifecycle of user-provided
clients (i.e., does not close them).

The base class provides an AsyncExitStack that stores can use to register cleanup
callbacks. Stores should add their cleanup operations to the exit stack as needed.
The base class handles entering and exiting the exit stack.
"""

_client_provided_by_user: bool
_exit_stack: AsyncExitStack
_exit_stack_entered: bool

def __init__(self, *, client_provided_by_user: bool = False, **kwargs: Any) -> None:
"""Initialize the context manager store with client ownership configuration.

Args:
client_provided_by_user: Whether the client was provided by the user. If True,
the store will not manage the client's lifecycle (will not close it).
Defaults to False.
**kwargs: Additional arguments to pass to the base store constructor.
"""
self._client_provided_by_user = client_provided_by_user
self._exit_stack = AsyncExitStack()
self._exit_stack_entered = False
super().__init__(**kwargs)

async def _ensure_exit_stack_entered(self) -> None:
"""Ensure the exit stack has been entered."""
if not self._exit_stack_entered:
await self._exit_stack.__aenter__()
self._exit_stack_entered = True

async def __aenter__(self) -> Self:
# Enter the exit stack
await self._ensure_exit_stack_entered()
await self.setup()
return self

async def __aexit__(
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None
) -> None:
await self._close()
) -> bool | None:
# Close the exit stack, which handles all cleanup
if self._exit_stack_entered:
result = await self._exit_stack.__aexit__(exc_type, exc_value, traceback)
self._exit_stack_entered = False

return result
return None

async def close(self) -> None:
await self._close()
# Close the exit stack if it has been entered
if self._exit_stack_entered:
await self._exit_stack.aclose()
self._exit_stack_entered = False

@abstractmethod
async def _close(self) -> None:
"""Close the store."""
...
async def setup(self) -> None:
"""Initialize the store if not already initialized.

This override ensures the exit stack is entered before the store's _setup()
method is called, allowing stores to register cleanup callbacks during setup.
"""
# Ensure exit stack is entered
await self._ensure_exit_stack_entered()
# Call parent setup
await super().setup()


class BaseEnumerateCollectionsStore(BaseStore, AsyncEnumerateCollectionsProtocol, ABC):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,17 @@ def default_disk_cache_factory(collection: str) -> Cache:

self._cache = {}

self._stable_api = True
self._serialization_adapter = BasicSerializationAdapter()

super().__init__(default_collection=default_collection)
super().__init__(
default_collection=default_collection,
stable_api=True,
)

@override
async def _setup(self) -> None:
"""Register cache cleanup."""
self._exit_stack.callback(self._sync_close)

@override
async def _setup_collection(self, *, collection: str) -> None:
Expand Down Expand Up @@ -146,9 +153,5 @@ def _sync_close(self) -> None:
for cache in self._cache.values():
cache.close()

@override
async def _close(self) -> None:
self._sync_close()

def __del__(self) -> None:
self._sync_close()
26 changes: 18 additions & 8 deletions key-value/key-value-aio/src/key_value/aio/stores/disk/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def __init__(
"""Initialize the disk store.

Args:
disk_cache: An existing diskcache Cache instance to use.
disk_cache: An existing diskcache Cache instance to use. If provided, the store will
not manage the cache's lifecycle (will not close it). The caller is responsible
for managing the cache's lifecycle.
directory: The directory to use for the disk store.
max_size: The maximum size of the disk store.
default_collection: The default collection to use if no collection is provided.
Expand All @@ -63,6 +65,9 @@ def __init__(
msg = "Either disk_cache or directory must be provided"
raise ValueError(msg)

client_provided = disk_cache is not None
self._client_provided_by_user = client_provided

if disk_cache:
self._cache = disk_cache
elif directory:
Expand All @@ -75,9 +80,17 @@ def __init__(
else:
self._cache = Cache(directory=directory, eviction_policy="none")

self._stable_api = True
super().__init__(
default_collection=default_collection,
client_provided_by_user=client_provided,
stable_api=True,
)

super().__init__(default_collection=default_collection)
@override
async def _setup(self) -> None:
"""Register cache cleanup if we own the cache."""
if not self._client_provided_by_user:
self._exit_stack.callback(self._cache.close)

@override
async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None:
Expand Down Expand Up @@ -119,9 +132,6 @@ async def _delete_managed_entry(self, *, key: str, collection: str) -> bool:

return self._cache.delete(key=combo_key, retry=True)

@override
async def _close(self) -> None:
self._cache.close()

def __del__(self) -> None:
self._cache.close()
if not getattr(self, "_client_provided_by_user", False) and hasattr(self, "_cache"):
self._cache.close()
61 changes: 17 additions & 44 deletions key-value/key-value-aio/src/key_value/aio/stores/duckdb/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,9 @@ class DuckDBStore(BaseContextManagerStore, BaseStore):

Values are stored in a JSON column as native dicts, allowing direct SQL queries
on the stored data for analytics and reporting.

Note on connection ownership: When you provide an existing connection, the store
will take ownership and close it when the store is closed or garbage collected.
If you need to reuse a connection, create separate DuckDB connections for each store.
"""

_connection: duckdb.DuckDBPyConnection
_is_closed: bool
_owns_connection: bool
_adapter: SerializationAdapter
_table_name: str

Expand All @@ -94,9 +88,8 @@ def __init__(
) -> None:
"""Initialize the DuckDB store with an existing connection.

Warning: The store will take ownership of the connection and close it
when the store is closed or garbage collected. If you need to reuse
a connection, create separate DuckDB connections for each store.
Note: If you provide a connection, the store will NOT manage its lifecycle (will not
close it). The caller is responsible for managing the connection's lifecycle.

Args:
connection: An existing DuckDB connection to use.
Expand Down Expand Up @@ -135,7 +128,9 @@ def __init__(
"""Initialize the DuckDB store.

Args:
connection: An existing DuckDB connection to use.
connection: An existing DuckDB connection to use. If provided, the store will NOT
manage its lifecycle (will not close it). The caller is responsible for managing
the connection's lifecycle.
database_path: Path to the database file. If None or ':memory:', uses in-memory database.
table_name: Name of the table to store key-value entries. Defaults to "kv_entries".
default_collection: The default collection to use if no collection is provided.
Expand All @@ -145,9 +140,10 @@ def __init__(
msg = "Provide only one of connection or database_path"
raise ValueError(msg)

client_provided = connection is not None

if connection is not None:
self._connection = connection
self._owns_connection = True # We take ownership even of provided connections
else:
# Convert Path to string if needed
if isinstance(database_path, Path):
Expand All @@ -158,19 +154,21 @@ def __init__(
self._connection = duckdb.connect(":memory:")
else:
self._connection = duckdb.connect(database=database_path)
self._owns_connection = True

self._is_closed = False
self._adapter = DuckDBSerializationAdapter()

# Validate table name to prevent SQL injection
if not re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", table_name):
msg = "Table name must start with a letter or underscore and contain only letters, digits, or underscores"
raise ValueError(msg)
self._table_name = table_name
self._stable_api = False

super().__init__(default_collection=default_collection, seed=seed)
super().__init__(
default_collection=default_collection,
seed=seed,
client_provided_by_user=client_provided,
stable_api=False,
)

def _get_create_table_sql(self) -> str:
"""Generate SQL for creating the key-value entries table.
Expand Down Expand Up @@ -263,6 +261,10 @@ async def _setup(self) -> None:
- Metadata queries without JSON deserialization
- Native JSON column support for rich querying capabilities
"""
# Register connection cleanup if we own the connection
if not self._client_provided_by_user:
self._exit_stack.callback(self._connection.close)

# Create the main table for storing key-value entries
self._connection.execute(self._get_create_table_sql())

Expand All @@ -279,10 +281,6 @@ async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry
Reconstructs the ManagedEntry from value column and metadata columns
using the serialization adapter.
"""
if self._is_closed:
msg = "Cannot operate on closed DuckDBStore"
raise RuntimeError(msg)

result = self._connection.execute(
self._get_select_sql(),
[collection, key],
Expand Down Expand Up @@ -323,10 +321,6 @@ async def _put_managed_entry(
Uses the serialization adapter to convert the ManagedEntry to the
appropriate storage format.
"""
if self._is_closed:
msg = "Cannot operate on closed DuckDBStore"
raise RuntimeError(msg)

# Ensure that the value is serializable to JSON
_ = managed_entry.value_as_json

Expand All @@ -349,10 +343,6 @@ async def _put_managed_entry(
@override
async def _delete_managed_entry(self, *, key: str, collection: str) -> bool:
"""Delete a managed entry by key from the specified collection."""
if self._is_closed:
msg = "Cannot operate on closed DuckDBStore"
raise RuntimeError(msg)

result = self._connection.execute(
self._get_delete_sql(),
[collection, key],
Expand All @@ -361,20 +351,3 @@ async def _delete_managed_entry(self, *, key: str, collection: str) -> bool:
# Check if any rows were deleted by counting returned rows
deleted_rows = result.fetchall()
return len(deleted_rows) > 0

@override
async def _close(self) -> None:
"""Close the DuckDB connection."""
if not self._is_closed and self._owns_connection:
self._connection.close()
self._is_closed = True

def __del__(self) -> None:
"""Clean up the DuckDB connection on deletion."""
try:
if not self._is_closed and self._owns_connection and hasattr(self, "_connection"):
self._connection.close()
self._is_closed = True
except Exception: # noqa: S110
# Suppress errors during cleanup to avoid issues during interpreter shutdown
pass
Loading