diff --git a/key-value/key-value-aio/src/key_value/aio/stores/base.py b/key-value/key-value-aio/src/key_value/aio/stores/base.py index 3b5f1e56..e511dc86 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/base.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/base.py @@ -6,6 +6,7 @@ from asyncio.locks import Lock from collections import defaultdict from collections.abc import Mapping, Sequence +from contextlib import AsyncExitStack from datetime import datetime from types import MappingProxyType, TracebackType from typing import Any, SupportsFloat @@ -85,6 +86,7 @@ def __init__( collection_sanitization_strategy: SanitizationStrategy | None = None, default_collection: str | None = None, seed: SEED_DATA_TYPE | None = None, + stable_api: bool = False, ) -> None: """Initialize the managed key-value store. @@ -97,6 +99,8 @@ def __init__( seed: Optional seed data to pre-populate the store. Format: {collection: {key: {field: value, ...}}}. Seeding occurs once during store initialization (when the store is first entered or when the first operation is performed on the store). + stable_api: Whether this store implementation has a stable API. If False, a warning will be issued. + Defaults to False. """ self._setup_complete = False @@ -113,8 +117,7 @@ def __init__( self._key_sanitization_strategy = key_sanitization_strategy or PassthroughStrategy() self._collection_sanitization_strategy = collection_sanitization_strategy or PassthroughStrategy() - if not hasattr(self, "_stable_api"): - self._stable_api = False + self._stable_api = stable_api if not self._stable_api: self._warn_about_stability() @@ -425,24 +428,74 @@ async def _get_collection_keys(self, *, collection: str, limit: int | None = Non class BaseContextManagerStore(BaseStore, ABC): - """An abstract base class for context manager stores.""" + """An abstract base class for context manager stores. + + Stores that accept a client parameter should pass `client_provided_by_user=True` to + the constructor. This ensures the store does not manage the lifecycle of user-provided + clients (i.e., does not close them). + + The base class provides an AsyncExitStack that stores can use to register cleanup + callbacks. Stores should add their cleanup operations to the exit stack as needed. + The base class handles entering and exiting the exit stack. + """ + + _client_provided_by_user: bool + _exit_stack: AsyncExitStack + _exit_stack_entered: bool + + def __init__(self, *, client_provided_by_user: bool = False, **kwargs: Any) -> None: + """Initialize the context manager store with client ownership configuration. + + Args: + client_provided_by_user: Whether the client was provided by the user. If True, + the store will not manage the client's lifecycle (will not close it). + Defaults to False. + **kwargs: Additional arguments to pass to the base store constructor. + """ + self._client_provided_by_user = client_provided_by_user + self._exit_stack = AsyncExitStack() + self._exit_stack_entered = False + super().__init__(**kwargs) + + async def _ensure_exit_stack_entered(self) -> None: + """Ensure the exit stack has been entered.""" + if not self._exit_stack_entered: + await self._exit_stack.__aenter__() + self._exit_stack_entered = True async def __aenter__(self) -> Self: + # Enter the exit stack + await self._ensure_exit_stack_entered() await self.setup() return self async def __aexit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None - ) -> None: - await self._close() + ) -> bool | None: + # Close the exit stack, which handles all cleanup + if self._exit_stack_entered: + result = await self._exit_stack.__aexit__(exc_type, exc_value, traceback) + self._exit_stack_entered = False + + return result + return None async def close(self) -> None: - await self._close() + # Close the exit stack if it has been entered + if self._exit_stack_entered: + await self._exit_stack.aclose() + self._exit_stack_entered = False - @abstractmethod - async def _close(self) -> None: - """Close the store.""" - ... + async def setup(self) -> None: + """Initialize the store if not already initialized. + + This override ensures the exit stack is entered before the store's _setup() + method is called, allowing stores to register cleanup callbacks during setup. + """ + # Ensure exit stack is entered + await self._ensure_exit_stack_entered() + # Call parent setup + await super().setup() class BaseEnumerateCollectionsStore(BaseStore, AsyncEnumerateCollectionsProtocol, ABC): diff --git a/key-value/key-value-aio/src/key_value/aio/stores/disk/multi_store.py b/key-value/key-value-aio/src/key_value/aio/stores/disk/multi_store.py index ed893c53..f5e7f59a 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/disk/multi_store.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/disk/multi_store.py @@ -99,10 +99,17 @@ def default_disk_cache_factory(collection: str) -> Cache: self._cache = {} - self._stable_api = True self._serialization_adapter = BasicSerializationAdapter() - super().__init__(default_collection=default_collection) + super().__init__( + default_collection=default_collection, + stable_api=True, + ) + + @override + async def _setup(self) -> None: + """Register cache cleanup.""" + self._exit_stack.callback(self._sync_close) @override async def _setup_collection(self, *, collection: str) -> None: @@ -146,9 +153,5 @@ def _sync_close(self) -> None: for cache in self._cache.values(): cache.close() - @override - async def _close(self) -> None: - self._sync_close() - def __del__(self) -> None: self._sync_close() diff --git a/key-value/key-value-aio/src/key_value/aio/stores/disk/store.py b/key-value/key-value-aio/src/key_value/aio/stores/disk/store.py index f88b6e1c..fc165987 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/disk/store.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/disk/store.py @@ -50,7 +50,9 @@ def __init__( """Initialize the disk store. Args: - disk_cache: An existing diskcache Cache instance to use. + disk_cache: An existing diskcache Cache instance to use. If provided, the store will + not manage the cache's lifecycle (will not close it). The caller is responsible + for managing the cache's lifecycle. directory: The directory to use for the disk store. max_size: The maximum size of the disk store. default_collection: The default collection to use if no collection is provided. @@ -63,6 +65,9 @@ def __init__( msg = "Either disk_cache or directory must be provided" raise ValueError(msg) + client_provided = disk_cache is not None + self._client_provided_by_user = client_provided + if disk_cache: self._cache = disk_cache elif directory: @@ -75,9 +80,17 @@ def __init__( else: self._cache = Cache(directory=directory, eviction_policy="none") - self._stable_api = True + super().__init__( + default_collection=default_collection, + client_provided_by_user=client_provided, + stable_api=True, + ) - super().__init__(default_collection=default_collection) + @override + async def _setup(self) -> None: + """Register cache cleanup if we own the cache.""" + if not self._client_provided_by_user: + self._exit_stack.callback(self._cache.close) @override async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None: @@ -119,9 +132,6 @@ async def _delete_managed_entry(self, *, key: str, collection: str) -> bool: return self._cache.delete(key=combo_key, retry=True) - @override - async def _close(self) -> None: - self._cache.close() - def __del__(self) -> None: - self._cache.close() + if not getattr(self, "_client_provided_by_user", False) and hasattr(self, "_cache"): + self._cache.close() diff --git a/key-value/key-value-aio/src/key_value/aio/stores/duckdb/store.py b/key-value/key-value-aio/src/key_value/aio/stores/duckdb/store.py index bb4ff890..583392fd 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/duckdb/store.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/duckdb/store.py @@ -71,15 +71,9 @@ class DuckDBStore(BaseContextManagerStore, BaseStore): Values are stored in a JSON column as native dicts, allowing direct SQL queries on the stored data for analytics and reporting. - - Note on connection ownership: When you provide an existing connection, the store - will take ownership and close it when the store is closed or garbage collected. - If you need to reuse a connection, create separate DuckDB connections for each store. """ _connection: duckdb.DuckDBPyConnection - _is_closed: bool - _owns_connection: bool _adapter: SerializationAdapter _table_name: str @@ -94,9 +88,8 @@ def __init__( ) -> None: """Initialize the DuckDB store with an existing connection. - Warning: The store will take ownership of the connection and close it - when the store is closed or garbage collected. If you need to reuse - a connection, create separate DuckDB connections for each store. + Note: If you provide a connection, the store will NOT manage its lifecycle (will not + close it). The caller is responsible for managing the connection's lifecycle. Args: connection: An existing DuckDB connection to use. @@ -135,7 +128,9 @@ def __init__( """Initialize the DuckDB store. Args: - connection: An existing DuckDB connection to use. + connection: An existing DuckDB connection to use. If provided, the store will NOT + manage its lifecycle (will not close it). The caller is responsible for managing + the connection's lifecycle. database_path: Path to the database file. If None or ':memory:', uses in-memory database. table_name: Name of the table to store key-value entries. Defaults to "kv_entries". default_collection: The default collection to use if no collection is provided. @@ -145,9 +140,10 @@ def __init__( msg = "Provide only one of connection or database_path" raise ValueError(msg) + client_provided = connection is not None + if connection is not None: self._connection = connection - self._owns_connection = True # We take ownership even of provided connections else: # Convert Path to string if needed if isinstance(database_path, Path): @@ -158,9 +154,7 @@ def __init__( self._connection = duckdb.connect(":memory:") else: self._connection = duckdb.connect(database=database_path) - self._owns_connection = True - self._is_closed = False self._adapter = DuckDBSerializationAdapter() # Validate table name to prevent SQL injection @@ -168,9 +162,13 @@ def __init__( msg = "Table name must start with a letter or underscore and contain only letters, digits, or underscores" raise ValueError(msg) self._table_name = table_name - self._stable_api = False - super().__init__(default_collection=default_collection, seed=seed) + super().__init__( + default_collection=default_collection, + seed=seed, + client_provided_by_user=client_provided, + stable_api=False, + ) def _get_create_table_sql(self) -> str: """Generate SQL for creating the key-value entries table. @@ -263,6 +261,10 @@ async def _setup(self) -> None: - Metadata queries without JSON deserialization - Native JSON column support for rich querying capabilities """ + # Register connection cleanup if we own the connection + if not self._client_provided_by_user: + self._exit_stack.callback(self._connection.close) + # Create the main table for storing key-value entries self._connection.execute(self._get_create_table_sql()) @@ -279,10 +281,6 @@ async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry Reconstructs the ManagedEntry from value column and metadata columns using the serialization adapter. """ - if self._is_closed: - msg = "Cannot operate on closed DuckDBStore" - raise RuntimeError(msg) - result = self._connection.execute( self._get_select_sql(), [collection, key], @@ -323,10 +321,6 @@ async def _put_managed_entry( Uses the serialization adapter to convert the ManagedEntry to the appropriate storage format. """ - if self._is_closed: - msg = "Cannot operate on closed DuckDBStore" - raise RuntimeError(msg) - # Ensure that the value is serializable to JSON _ = managed_entry.value_as_json @@ -349,10 +343,6 @@ async def _put_managed_entry( @override async def _delete_managed_entry(self, *, key: str, collection: str) -> bool: """Delete a managed entry by key from the specified collection.""" - if self._is_closed: - msg = "Cannot operate on closed DuckDBStore" - raise RuntimeError(msg) - result = self._connection.execute( self._get_delete_sql(), [collection, key], @@ -361,20 +351,3 @@ async def _delete_managed_entry(self, *, key: str, collection: str) -> bool: # Check if any rows were deleted by counting returned rows deleted_rows = result.fetchall() return len(deleted_rows) > 0 - - @override - async def _close(self) -> None: - """Close the DuckDB connection.""" - if not self._is_closed and self._owns_connection: - self._connection.close() - self._is_closed = True - - def __del__(self) -> None: - """Clean up the DuckDB connection on deletion.""" - try: - if not self._is_closed and self._owns_connection and hasattr(self, "_connection"): - self._connection.close() - self._is_closed = True - except Exception: # noqa: S110 - # Suppress errors during cleanup to avoid issues during interpreter shutdown - pass diff --git a/key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py b/key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py index 46c0dafd..0469dd96 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py @@ -1,9 +1,8 @@ from datetime import datetime, timezone -from types import TracebackType from typing import TYPE_CHECKING, Any, overload from key_value.shared.utils.managed_entry import ManagedEntry -from typing_extensions import Self, override +from typing_extensions import override from key_value.aio.stores.base import ( BaseContextManagerStore, @@ -90,6 +89,9 @@ def __init__( """Initialize the DynamoDB store. Args: + client: The DynamoDB client to use. If provided, the store will not manage the client's + lifecycle (will not enter/exit its context manager). The caller is responsible for + managing the client's lifecycle and must ensure the client is already entered. table_name: The name of the DynamoDB table to use. region_name: AWS region name. Defaults to None (uses AWS default). endpoint_url: Custom endpoint URL (useful for local DynamoDB). Defaults to None. @@ -99,8 +101,11 @@ def __init__( default_collection: The default collection to use if no collection is provided. """ self._table_name = table_name + client_provided = client is not None + if client: self._client = client + self._raw_client = None else: session: Session = aioboto3.Session( region_name=region_name, @@ -113,22 +118,10 @@ def __init__( self._client = None - super().__init__(default_collection=default_collection) - - @override - async def __aenter__(self) -> Self: - if self._raw_client: - self._client = await self._raw_client.__aenter__() - await super().__aenter__() - return self - - @override - async def __aexit__( - self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None - ) -> None: - await super().__aexit__(exc_type, exc_value, traceback) - if self._client: - await self._client.__aexit__(exc_type, exc_value, traceback) + super().__init__( + default_collection=default_collection, + client_provided_by_user=client_provided, + ) @property def _connected_client(self) -> DynamoDBClient: @@ -140,10 +133,9 @@ def _connected_client(self) -> DynamoDBClient: @override async def _setup(self) -> None: """Setup the DynamoDB client and ensure table exists.""" - - if not self._client: - self._client = await self._raw_client.__aenter__() - + # Register client cleanup if we own the client + if not self._client_provided_by_user and self._raw_client is not None: + self._client = await self._exit_stack.enter_async_context(self._raw_client) try: await self._connected_client.describe_table(TableName=self._table_name) # pyright: ignore[reportUnknownMemberType] except self._connected_client.exceptions.ResourceNotFoundException: # pyright: ignore[reportUnknownMemberType] @@ -253,8 +245,4 @@ async def _delete_managed_entry(self, *, key: str, collection: str) -> bool: # Return True if an item was actually deleted return "Attributes" in response # pyright: ignore[reportUnknownArgumentType] - @override - async def _close(self) -> None: - """Close the DynamoDB client.""" - if self._client: - await self._client.__aexit__(None, None, None) # pyright: ignore[reportUnknownMemberType] + # No need to override _close - the exit stack handles all cleanup automatically diff --git a/key-value/key-value-aio/src/key_value/aio/stores/elasticsearch/store.py b/key-value/key-value-aio/src/key_value/aio/stores/elasticsearch/store.py index 9b120157..78fe809b 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/elasticsearch/store.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/elasticsearch/store.py @@ -215,7 +215,9 @@ def __init__( """Initialize the elasticsearch store. Args: - elasticsearch_client: The elasticsearch client to use. + elasticsearch_client: The elasticsearch client to use. If provided, the store will not + manage the client's lifecycle (will not close it). The caller is responsible for + managing the client's lifecycle. url: The url of the elasticsearch cluster. api_key: The api key to use. index_prefix: The index prefix to use. Collections will be prefixed with this prefix. @@ -227,6 +229,8 @@ def __init__( msg = "Either elasticsearch_client or url must be provided" raise ValueError(msg) + client_provided = elasticsearch_client is not None + if elasticsearch_client: self._client = elasticsearch_client elif url: @@ -250,10 +254,15 @@ def __init__( default_collection=default_collection, collection_sanitization_strategy=collection_sanitization_strategy, key_sanitization_strategy=key_sanitization_strategy, + client_provided_by_user=client_provided, ) @override async def _setup(self) -> None: + # Register client cleanup if we own the client + if not self._client_provided_by_user: + self._exit_stack.push_async_callback(self._client.close) + cluster_info = await self._client.options(ignore_status=404).info() self._is_serverless = cluster_info.get("version", {}).get("build_flavor") == "serverless" @@ -546,7 +555,3 @@ async def _cull(self) -> None: }, }, ) - - @override - async def _close(self) -> None: - await self._client.close() diff --git a/key-value/key-value-aio/src/key_value/aio/stores/memcached/store.py b/key-value/key-value-aio/src/key_value/aio/stores/memcached/store.py index 0f39229c..7be9cd69 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/memcached/store.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/memcached/store.py @@ -60,19 +60,33 @@ def __init__( """Initialize the Memcached store. Args: - client: An existing aiomcache client to use. + client: An existing aiomcache client to use. If provided, the store will not manage + the client's lifecycle (will not close it). The caller is responsible for + managing the client's lifecycle. host: Memcached host. Defaults to 127.0.0.1. port: Memcached port. Defaults to 11211. default_collection: The default collection to use if no collection is provided. key_sanitization_strategy: The sanitization strategy to use for keys. """ - self._client = client or Client(host=host, port=port) + client_provided = client is not None + + if client is not None: + self._client = client + else: + self._client = Client(host=host, port=port) super().__init__( default_collection=default_collection, key_sanitization_strategy=key_sanitization_strategy, + client_provided_by_user=client_provided, ) + @override + async def _setup(self) -> None: + """Register client cleanup if we own the client.""" + if not self._client_provided_by_user: + self._exit_stack.push_async_callback(self._client.close) + @override async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None: combo_key: str = self._sanitize_key(compound_key(collection=collection, key=key)) @@ -143,7 +157,3 @@ async def _delete_managed_entry(self, *, key: str, collection: str) -> bool: async def _delete_store(self) -> bool: _ = await self._client.flush_all() return True - - @override - async def _close(self) -> None: - await self._client.close() diff --git a/key-value/key-value-aio/src/key_value/aio/stores/memory/store.py b/key-value/key-value-aio/src/key_value/aio/stores/memory/store.py index 05f7ad0c..98c68f46 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/memory/store.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/memory/store.py @@ -115,9 +115,11 @@ def __init__( self._cache = {} - self._stable_api = True - - super().__init__(default_collection=default_collection, seed=seed) + super().__init__( + default_collection=default_collection, + seed=seed, + stable_api=True, + ) @override async def _setup(self) -> None: diff --git a/key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py b/key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py index 9c0fab07..c922b090 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py @@ -8,7 +8,7 @@ from key_value.shared.utils.sanitization import HybridSanitizationStrategy, SanitizationStrategy from key_value.shared.utils.sanitize import ALPHANUMERIC_CHARACTERS from key_value.shared.utils.serialization import SerializationAdapter -from typing_extensions import Self, override +from typing_extensions import override from key_value.aio.stores.base import BaseContextManagerStore, BaseDestroyCollectionStore, BaseStore @@ -159,7 +159,9 @@ def __init__( Values are stored as native BSON dictionaries for better query support and performance. Args: - client: The MongoDB client to use (mutually exclusive with url). + client: The MongoDB client to use (mutually exclusive with url). If provided, the store + will not manage the client's lifecycle (will not enter/exit its context manager or + close it). The caller is responsible for managing the client's lifecycle. url: The url of the MongoDB cluster (mutually exclusive with client). db_name: The name of the MongoDB database. coll_name: The name of the MongoDB collection. @@ -167,6 +169,8 @@ def __init__( collection_sanitization_strategy: The sanitization strategy to use for collections. """ + client_provided = client is not None + if client: self._client = client elif url: @@ -185,18 +189,14 @@ def __init__( super().__init__( default_collection=default_collection, collection_sanitization_strategy=collection_sanitization_strategy, + client_provided_by_user=client_provided, ) @override - async def __aenter__(self) -> Self: - _ = await self._client.__aenter__() - await super().__aenter__() - return self - - @override - async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: # pyright: ignore[reportAny] - await super().__aexit__(exc_type, exc_val, exc_tb) - await self._client.__aexit__(exc_type, exc_val, exc_tb) + async def _setup(self) -> None: + """Register client cleanup if we own the client.""" + if not self._client_provided_by_user: + await self._exit_stack.enter_async_context(self._client) @override async def _setup_collection(self, *, collection: str) -> None: @@ -334,6 +334,4 @@ async def _delete_collection(self, *, collection: str) -> bool: return True - @override - async def _close(self) -> None: - await self._client.close() + # No need to override _close - the exit stack handles all cleanup automatically diff --git a/key-value/key-value-aio/src/key_value/aio/stores/redis/store.py b/key-value/key-value-aio/src/key_value/aio/stores/redis/store.py index 1299e7ab..985ebce9 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/redis/store.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/redis/store.py @@ -54,7 +54,9 @@ def __init__( """Initialize the Redis store. Args: - client: An existing Redis client to use. + client: An existing Redis client to use. If provided, the store will not manage + the client's lifecycle (will not close it). The caller is responsible for + managing the client's lifecycle. url: Redis URL (e.g., redis://localhost:6379/0). host: Redis host. Defaults to localhost. port: Redis port. Defaults to 6379. @@ -62,6 +64,8 @@ def __init__( password: Redis password. Defaults to None. default_collection: The default collection to use if no collection is provided. """ + client_provided = client is not None + if client: self._client = client elif url: @@ -82,10 +86,13 @@ def __init__( decode_responses=True, ) - self._stable_api = True self._adapter = BasicSerializationAdapter(date_format="isoformat", value_format="dict") - super().__init__(default_collection=default_collection) + super().__init__( + default_collection=default_collection, + client_provided_by_user=client_provided, + stable_api=True, + ) @override async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None: @@ -212,9 +219,11 @@ async def _get_collection_keys(self, *, collection: str, limit: int | None = Non return get_keys_from_compound_keys(compound_keys=keys, collection=collection) @override - async def _delete_store(self) -> bool: - return await self._client.flushdb() # pyright: ignore[reportUnknownMemberType, reportAny] + async def _setup(self) -> None: + """Register client cleanup if we own the client.""" + if not self._client_provided_by_user: + self._exit_stack.push_async_callback(self._client.aclose) @override - async def _close(self) -> None: - await self._client.aclose() + async def _delete_store(self) -> bool: + return await self._client.flushdb() # pyright: ignore[reportUnknownMemberType, reportAny] diff --git a/key-value/key-value-aio/src/key_value/aio/stores/rocksdb/store.py b/key-value/key-value-aio/src/key_value/aio/stores/rocksdb/store.py index b845d7c6..b9b18b8f 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/rocksdb/store.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/rocksdb/store.py @@ -1,9 +1,8 @@ from collections.abc import Sequence from datetime import datetime from pathlib import Path -from typing import Any, overload +from typing import overload -from key_value.shared.errors.store import KeyValueStoreError from key_value.shared.utils.compound import compound_key from key_value.shared.utils.managed_entry import ManagedEntry from typing_extensions import override @@ -21,7 +20,6 @@ class RocksDBStore(BaseContextManagerStore, BaseStore): """A RocksDB-based key-value store.""" _db: Rdict - _is_closed: bool @overload def __init__(self, *, db: Rdict, default_collection: str | None = None) -> None: @@ -51,7 +49,9 @@ def __init__( """Initialize the RocksDB store. Args: - db: An existing Rdict database instance to use. + db: An existing Rdict database instance to use. If provided, the store will NOT + manage its lifecycle (will not close it). The caller is responsible for managing + the database's lifecycle. path: The path to the RocksDB database directory. default_collection: The default collection to use if no collection is provided. """ @@ -63,6 +63,8 @@ def __init__( msg = "Either db or path must be provided" raise ValueError(msg) + client_provided = db is not None + if db: self._db = db elif path: @@ -74,33 +76,24 @@ def __init__( self._db = Rdict(str(path), options=opts) - self._is_closed = False - - super().__init__(default_collection=default_collection) + super().__init__( + default_collection=default_collection, + client_provided_by_user=client_provided, + ) @override - async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: # pyright: ignore[reportAny] - await super().__aexit__(exc_type, exc_val, exc_tb) - await self._close() - - @override - async def _close(self) -> None: - self._close_and_flush() + async def _setup(self) -> None: + """Register database cleanup if we own the database.""" + if not self._client_provided_by_user: + # Register a callback to close and flush the database + self._exit_stack.callback(self._close_and_flush) def _close_and_flush(self) -> None: - if not self._is_closed: - self._db.flush() - self._db.close() - self._is_closed = True - - def _fail_on_closed_store(self) -> None: - if self._is_closed: - raise KeyValueStoreError(message="Operation attempted on closed store") + self._db.flush() + self._db.close() @override async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None: - self._fail_on_closed_store() - combo_key: str = compound_key(collection=collection, key=key) value: bytes | None = self._db.get(combo_key) @@ -121,8 +114,6 @@ async def _put_managed_entry( collection: str, managed_entry: ManagedEntry, ) -> None: - self._fail_on_closed_store() - combo_key: str = compound_key(collection=collection, key=key) json_value: str = self._serialization_adapter.dump_json(entry=managed_entry, key=key, collection=collection) @@ -139,8 +130,6 @@ async def _put_managed_entries( created_at: datetime, expires_at: datetime | None, ) -> None: - self._fail_on_closed_store() - if not keys: return @@ -154,8 +143,6 @@ async def _put_managed_entries( @override async def _delete_managed_entry(self, *, key: str, collection: str) -> bool: - self._fail_on_closed_store() - combo_key: str = compound_key(collection=collection, key=key) # Check if key exists before deleting, this is only used for tracking deleted count @@ -167,8 +154,6 @@ async def _delete_managed_entry(self, *, key: str, collection: str) -> bool: @override async def _delete_managed_entries(self, *, keys: Sequence[str], collection: str) -> int: - self._fail_on_closed_store() - if not keys: return 0 @@ -189,6 +174,3 @@ async def _delete_managed_entries(self, *, keys: Sequence[str], collection: str) self._db.write(batch) return deleted_count - - def __del__(self) -> None: - self._close_and_flush() diff --git a/key-value/key-value-aio/src/key_value/aio/stores/valkey/store.py b/key-value/key-value-aio/src/key_value/aio/stores/valkey/store.py index ec34114d..a5d8f380 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/valkey/store.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/valkey/store.py @@ -52,6 +52,21 @@ def __init__( username: str | None = None, password: str | None = None, ) -> None: + """Initialize the Valkey store. + + Args: + client: An existing Valkey client to use. If provided, the store will not manage + the client's lifecycle (will not close it). The caller is responsible for + managing the client's lifecycle. + default_collection: The default collection to use if no collection is provided. + host: Valkey host. Defaults to localhost. + port: Valkey port. Defaults to 6379. + db: Valkey database number. Defaults to 0. + username: Valkey username. Defaults to None. + password: Valkey password. Defaults to None. + """ + client_provided = client is not None + if client is not None: self._connected_client = client else: @@ -61,9 +76,11 @@ def __init__( self._client_config = GlideClientConfiguration(addresses=addresses, database_id=db, credentials=credentials) self._connected_client = None - self._stable_api = True - - super().__init__(default_collection=default_collection) + super().__init__( + default_collection=default_collection, + client_provided_by_user=client_provided, + stable_api=True, + ) @override async def _setup(self) -> None: @@ -75,6 +92,10 @@ async def _setup(self) -> None: self._connected_client = await GlideClient.create(config=self._client_config) + # Register client cleanup if we own the client + if not self._client_provided_by_user: + self._exit_stack.push_async_callback(self._client.close) + @property def _client(self) -> BaseClient: if self._connected_client is None: @@ -147,9 +168,3 @@ async def _delete_managed_entries(self, *, keys: Sequence[str], collection: str) deleted_count: int = await self._client.delete(keys=combo_keys) # pyright: ignore[reportArgumentType] return deleted_count - - @override - async def _close(self) -> None: - if self._connected_client is None: - return - await self._client.close() diff --git a/key-value/key-value-aio/tests/stores/base.py b/key-value/key-value-aio/tests/stores/base.py index 1aa3bc02..b28a6caf 100644 --- a/key-value/key-value-aio/tests/stores/base.py +++ b/key-value/key-value-aio/tests/stores/base.py @@ -1,6 +1,9 @@ import hashlib +import sys +import tempfile from abc import ABC, abstractmethod from collections.abc import AsyncGenerator +from pathlib import Path from typing import Any import pytest @@ -26,6 +29,17 @@ class BaseStoreTests(ABC): async def eventually_consistent(self) -> None: # noqa: B027 """Subclasses can override this to wait for eventually consistent operations.""" + @pytest.fixture + async def per_test_temp_dir(self) -> AsyncGenerator[Path, None]: + # ignore cleanup errors on Windows + if sys.platform == "win32": + ignore_cleanup_errors = True + else: + ignore_cleanup_errors = False + + with tempfile.TemporaryDirectory(ignore_cleanup_errors=ignore_cleanup_errors) as temp_dir: + yield Path(temp_dir) + @pytest.fixture @abstractmethod async def store(self) -> BaseStore | AsyncGenerator[BaseStore, None]: ... @@ -261,7 +275,7 @@ async def test_minimum_put_many_delete_many_performance(self, store: BaseStore): class ContextManagerStoreTestMixin: @pytest.fixture(params=[True, False], ids=["with_ctx_manager", "no_ctx_manager"], autouse=True) async def enter_exit_store( - self, request: pytest.FixtureRequest, store: BaseContextManagerStore + self, request: pytest.FixtureRequest, store: BaseContextManagerStore, per_test_temp_dir: Path ) -> AsyncGenerator[BaseContextManagerStore, None]: context_manager = request.param # pyright: ignore[reportAny] diff --git a/key-value/key-value-aio/tests/stores/disk/test_disk.py b/key-value/key-value-aio/tests/stores/disk/test_disk.py index 639451a8..5acddbc7 100644 --- a/key-value/key-value-aio/tests/stores/disk/test_disk.py +++ b/key-value/key-value-aio/tests/stores/disk/test_disk.py @@ -1,6 +1,5 @@ import json -import tempfile -from collections.abc import AsyncGenerator +from pathlib import Path import pytest from dirty_equals import IsDatetime @@ -15,21 +14,19 @@ class TestDiskStore(ContextManagerStoreTestMixin, BaseStoreTests): - @pytest.fixture(scope="session") - async def disk_store(self) -> AsyncGenerator[DiskStore, None]: - with tempfile.TemporaryDirectory() as temp_dir: - yield DiskStore(directory=temp_dir, max_size=TEST_SIZE_LIMIT) - @override @pytest.fixture - async def store(self, disk_store: DiskStore) -> DiskStore: + async def store(self, per_test_temp_dir: Path) -> DiskStore: + disk_store = DiskStore(directory=per_test_temp_dir, max_size=TEST_SIZE_LIMIT) + disk_store._cache.clear() # pyright: ignore[reportPrivateUsage] return disk_store @pytest.fixture - async def disk_cache(self, disk_store: DiskStore) -> Cache: - return disk_store._cache # pyright: ignore[reportPrivateUsage] + async def disk_cache(self, store: DiskStore) -> Cache: + assert isinstance(store._cache, Cache) + return store._cache # pyright: ignore[reportPrivateUsage] async def test_value_stored(self, store: DiskStore, disk_cache: Cache): await store.put(collection="test", key="test_key", value={"name": "Alice", "age": 30}) diff --git a/key-value/key-value-aio/tests/stores/disk/test_multi_disk.py b/key-value/key-value-aio/tests/stores/disk/test_multi_disk.py index 5d38e406..20122b49 100644 --- a/key-value/key-value-aio/tests/stores/disk/test_multi_disk.py +++ b/key-value/key-value-aio/tests/stores/disk/test_multi_disk.py @@ -1,5 +1,4 @@ import json -import tempfile from collections.abc import AsyncGenerator from pathlib import Path from typing import TYPE_CHECKING @@ -19,18 +18,16 @@ class TestMultiDiskStore(ContextManagerStoreTestMixin, BaseStoreTests): - @pytest.fixture(scope="session") - async def multi_disk_store(self) -> AsyncGenerator[MultiDiskStore, None]: - with tempfile.TemporaryDirectory() as temp_dir: - yield MultiDiskStore(base_directory=Path(temp_dir), max_size=TEST_SIZE_LIMIT) - @override @pytest.fixture - async def store(self, multi_disk_store: MultiDiskStore) -> MultiDiskStore: - for collection in multi_disk_store._cache: # pyright: ignore[reportPrivateUsage] - multi_disk_store._cache[collection].clear() # pyright: ignore[reportPrivateUsage] + async def store(self, per_test_temp_dir: Path) -> AsyncGenerator[MultiDiskStore, None]: + store = MultiDiskStore(base_directory=per_test_temp_dir, max_size=TEST_SIZE_LIMIT) + + yield store - return multi_disk_store + # Wipe the store after returning it + for collection in store._cache: # pyright: ignore[reportPrivateUsage] + store._cache[collection].clear() # pyright: ignore[reportPrivateUsage] async def test_value_stored(self, store: MultiDiskStore): await store.put(collection="test", key="test_key", value={"name": "Alice", "age": 30}) diff --git a/key-value/key-value-aio/tests/stores/duckdb/test_duckdb.py b/key-value/key-value-aio/tests/stores/duckdb/test_duckdb.py index 9b890e8d..bdd906bc 100644 --- a/key-value/key-value-aio/tests/stores/duckdb/test_duckdb.py +++ b/key-value/key-value-aio/tests/stores/duckdb/test_duckdb.py @@ -34,13 +34,13 @@ async def test_not_unbounded(self, store: BaseStore): ... class TestDuckDBStorePersistent(ContextManagerStoreTestMixin, BaseStoreTests): @override @pytest.fixture - async def store(self) -> AsyncGenerator[DuckDBStore, None]: + async def store(self, per_test_temp_dir: Path) -> AsyncGenerator[DuckDBStore, None]: """Test with persistent DuckDB database file.""" - with TemporaryDirectory() as temp_dir: - db_path = Path(temp_dir) / "test.db" - duckdb_store = DuckDBStore(database_path=db_path) - yield duckdb_store - await duckdb_store.close() + duckdb_store = DuckDBStore(database_path=per_test_temp_dir / "test.db") + + yield duckdb_store + + await duckdb_store.close() @pytest.mark.skip(reason="Local disk stores are unbounded") async def test_not_unbounded(self, store: BaseStore): ... diff --git a/key-value/key-value-aio/tests/stores/filetree/test_filetree.py b/key-value/key-value-aio/tests/stores/filetree/test_filetree.py index 14d57aba..823fdb30 100644 --- a/key-value/key-value-aio/tests/stores/filetree/test_filetree.py +++ b/key-value/key-value-aio/tests/stores/filetree/test_filetree.py @@ -1,7 +1,5 @@ """Tests for FileTreeStore.""" -import tempfile -from collections.abc import AsyncGenerator from pathlib import Path import pytest @@ -20,19 +18,17 @@ class TestFileTreeStore(BaseStoreTests): """Test suite for FileTreeStore.""" @pytest.fixture - async def store(self) -> AsyncGenerator[FileTreeStore, None]: + async def store(self, per_test_temp_dir: Path) -> FileTreeStore: """Create a FileTreeStore instance with a temporary directory. Uses V1 sanitization strategies to maintain backwards compatibility and pass tests that rely on sanitization for long/special names. """ - with tempfile.TemporaryDirectory() as temp_dir: - temp_path = Path(temp_dir) - yield FileTreeStore( - data_directory=temp_path, - key_sanitization_strategy=FileTreeV1KeySanitizationStrategy(directory=temp_path), - collection_sanitization_strategy=FileTreeV1CollectionSanitizationStrategy(directory=temp_path), - ) + return FileTreeStore( + data_directory=per_test_temp_dir, + key_sanitization_strategy=FileTreeV1KeySanitizationStrategy(directory=per_test_temp_dir), + collection_sanitization_strategy=FileTreeV1CollectionSanitizationStrategy(directory=per_test_temp_dir), + ) @override async def test_not_unbounded(self, store: BaseStore): diff --git a/key-value/key-value-aio/tests/stores/rocksdb/test_rocksdb.py b/key-value/key-value-aio/tests/stores/rocksdb/test_rocksdb.py index c49d5a74..ba1e9a0d 100644 --- a/key-value/key-value-aio/tests/stores/rocksdb/test_rocksdb.py +++ b/key-value/key-value-aio/tests/stores/rocksdb/test_rocksdb.py @@ -1,5 +1,4 @@ import json -from collections.abc import AsyncGenerator from pathlib import Path from tempfile import TemporaryDirectory @@ -18,13 +17,8 @@ class TestRocksDBStore(ContextManagerStoreTestMixin, BaseStoreTests): @override @pytest.fixture - async def store(self) -> AsyncGenerator[RocksDBStore, None]: - """Create a RocksDB store for testing.""" - # Create a temporary directory for the RocksDB database - with TemporaryDirectory() as temp_dir: - db_path = Path(temp_dir) / "test_db" - rocksdb_store = RocksDBStore(path=db_path) - yield rocksdb_store + async def store(self, per_test_temp_dir: Path) -> RocksDBStore: + return RocksDBStore(path=per_test_temp_dir / "test_db") async def test_rocksdb_path_connection(self): """Test RocksDB store creation with path.""" @@ -59,6 +53,8 @@ async def test_rocksdb_db_connection(self): assert result == {"test": "value"} await store.close() + # Close the user-provided database before cleanup + db.close() temp_dir.cleanup() @pytest.mark.skip(reason="Local disk stores are unbounded") diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/base.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/base.py index 1b3ff65e..3b059639 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/base.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/base.py @@ -8,6 +8,7 @@ from abc import ABC, abstractmethod from collections import defaultdict from collections.abc import Mapping, Sequence +from contextlib import ExitStack from datetime import datetime from threading import Lock from types import MappingProxyType, TracebackType @@ -91,6 +92,7 @@ def __init__( collection_sanitization_strategy: SanitizationStrategy | None = None, default_collection: str | None = None, seed: SEED_DATA_TYPE | None = None, + stable_api: bool = False, ) -> None: """Initialize the managed key-value store. @@ -103,6 +105,8 @@ def __init__( seed: Optional seed data to pre-populate the store. Format: {collection: {key: {field: value, ...}}}. Seeding occurs once during store initialization (when the store is first entered or when the first operation is performed on the store). + stable_api: Whether this store implementation has a stable API. If False, a warning will be issued. + Defaults to False. """ self._setup_complete = False @@ -119,8 +123,7 @@ def __init__( self._key_sanitization_strategy = key_sanitization_strategy or PassthroughStrategy() self._collection_sanitization_strategy = collection_sanitization_strategy or PassthroughStrategy() - if not hasattr(self, "_stable_api"): - self._stable_api = False + self._stable_api = stable_api if not self._stable_api: self._warn_about_stability() @@ -397,22 +400,74 @@ def _get_collection_keys(self, *, collection: str, limit: int | None = None) -> class BaseContextManagerStore(BaseStore, ABC): - """An abstract base class for context manager stores.""" + """An abstract base class for context manager stores. + + Stores that accept a client parameter should pass `client_provided_by_user=True` to + the constructor. This ensures the store does not manage the lifecycle of user-provided + clients (i.e., does not close them). + + The base class provides an ExitStack that stores can use to register cleanup + callbacks. Stores should add their cleanup operations to the exit stack as needed. + The base class handles entering and exiting the exit stack. + """ + + _client_provided_by_user: bool + _exit_stack: ExitStack + _exit_stack_entered: bool + + def __init__(self, *, client_provided_by_user: bool = False, **kwargs: Any) -> None: + """Initialize the context manager store with client ownership configuration. + + Args: + client_provided_by_user: Whether the client was provided by the user. If True, + the store will not manage the client's lifecycle (will not close it). + Defaults to False. + **kwargs: Additional arguments to pass to the base store constructor. + """ + self._client_provided_by_user = client_provided_by_user + self._exit_stack = ExitStack() + self._exit_stack_entered = False + super().__init__(**kwargs) + + def _ensure_exit_stack_entered(self) -> None: + """Ensure the exit stack has been entered.""" + if not self._exit_stack_entered: + self._exit_stack.__enter__() + self._exit_stack_entered = True def __enter__(self) -> Self: + # Enter the exit stack + self._ensure_exit_stack_entered() self.setup() return self - def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) -> None: - self._close() + def __exit__( + self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None + ) -> bool | None: + # Close the exit stack, which handles all cleanup + if self._exit_stack_entered: + result = self._exit_stack.__exit__(exc_type, exc_value, traceback) + self._exit_stack_entered = False + + return result + return None def close(self) -> None: - self._close() + # Close the exit stack if it has been entered + if self._exit_stack_entered: + self._exit_stack.close() + self._exit_stack_entered = False - @abstractmethod - def _close(self) -> None: - """Close the store.""" - ... + def setup(self) -> None: + """Initialize the store if not already initialized. + + This override ensures the exit stack is entered before the store's _setup() + method is called, allowing stores to register cleanup callbacks during setup. + """ + # Ensure exit stack is entered + self._ensure_exit_stack_entered() + # Call parent setup + super().setup() class BaseEnumerateCollectionsStore(BaseStore, EnumerateCollectionsProtocol, ABC): diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/multi_store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/multi_store.py index 9bcc33c2..7b401039 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/multi_store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/multi_store.py @@ -102,10 +102,14 @@ def default_disk_cache_factory(collection: str) -> Cache: self._cache = {} - self._stable_api = True self._serialization_adapter = BasicSerializationAdapter() - super().__init__(default_collection=default_collection) + super().__init__(default_collection=default_collection, stable_api=True) + + @override + def _setup(self) -> None: + """Register cache cleanup.""" + self._exit_stack.callback(self._sync_close) @override def _setup_collection(self, *, collection: str) -> None: @@ -143,9 +147,5 @@ def _sync_close(self) -> None: for cache in self._cache.values(): cache.close() - @override - def _close(self) -> None: - self._sync_close() - def __del__(self) -> None: self._sync_close() diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/store.py index 07c0b929..d1d7ae71 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/store.py @@ -53,7 +53,9 @@ def __init__( """Initialize the disk store. Args: - disk_cache: An existing diskcache Cache instance to use. + disk_cache: An existing diskcache Cache instance to use. If provided, the store will + not manage the cache's lifecycle (will not close it). The caller is responsible + for managing the cache's lifecycle. directory: The directory to use for the disk store. max_size: The maximum size of the disk store. default_collection: The default collection to use if no collection is provided. @@ -66,6 +68,9 @@ def __init__( msg = "Either disk_cache or directory must be provided" raise ValueError(msg) + client_provided = disk_cache is not None + self._client_provided_by_user = client_provided + if disk_cache: self._cache = disk_cache elif directory: @@ -78,9 +83,13 @@ def __init__( else: self._cache = Cache(directory=directory, eviction_policy="none") - self._stable_api = True + super().__init__(default_collection=default_collection, client_provided_by_user=client_provided, stable_api=True) - super().__init__(default_collection=default_collection) + @override + def _setup(self) -> None: + """Register cache cleanup if we own the cache.""" + if not self._client_provided_by_user: + self._exit_stack.callback(self._cache.close) @override def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None: @@ -116,9 +125,6 @@ def _delete_managed_entry(self, *, key: str, collection: str) -> bool: return self._cache.delete(key=combo_key, retry=True) - @override - def _close(self) -> None: - self._cache.close() - def __del__(self) -> None: - self._cache.close() + if not getattr(self, "_client_provided_by_user", False) and hasattr(self, "_cache"): + self._cache.close() diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/duckdb/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/duckdb/store.py index 15c7713f..7232fbff 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/duckdb/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/duckdb/store.py @@ -74,15 +74,9 @@ class DuckDBStore(BaseContextManagerStore, BaseStore): Values are stored in a JSON column as native dicts, allowing direct SQL queries on the stored data for analytics and reporting. - - Note on connection ownership: When you provide an existing connection, the store - will take ownership and close it when the store is closed or garbage collected. - If you need to reuse a connection, create separate DuckDB connections for each store. """ _connection: duckdb.DuckDBPyConnection - _is_closed: bool - _owns_connection: bool _adapter: SerializationAdapter _table_name: str @@ -97,9 +91,8 @@ def __init__( ) -> None: """Initialize the DuckDB store with an existing connection. - Warning: The store will take ownership of the connection and close it - when the store is closed or garbage collected. If you need to reuse - a connection, create separate DuckDB connections for each store. + Note: If you provide a connection, the store will NOT manage its lifecycle (will not + close it). The caller is responsible for managing the connection's lifecycle. Args: connection: An existing DuckDB connection to use. @@ -138,7 +131,9 @@ def __init__( """Initialize the DuckDB store. Args: - connection: An existing DuckDB connection to use. + connection: An existing DuckDB connection to use. If provided, the store will NOT + manage its lifecycle (will not close it). The caller is responsible for managing + the connection's lifecycle. database_path: Path to the database file. If None or ':memory:', uses in-memory database. table_name: Name of the table to store key-value entries. Defaults to "kv_entries". default_collection: The default collection to use if no collection is provided. @@ -148,9 +143,10 @@ def __init__( msg = "Provide only one of connection or database_path" raise ValueError(msg) + client_provided = connection is not None + if connection is not None: self._connection = connection - self._owns_connection = True # We take ownership even of provided connections else: # Convert Path to string if needed if isinstance(database_path, Path): @@ -160,9 +156,7 @@ def __init__( self._connection = duckdb.connect(":memory:") else: self._connection = duckdb.connect(database=database_path) - self._owns_connection = True - self._is_closed = False self._adapter = DuckDBSerializationAdapter() # Validate table name to prevent SQL injection @@ -170,9 +164,8 @@ def __init__( msg = "Table name must start with a letter or underscore and contain only letters, digits, or underscores" raise ValueError(msg) self._table_name = table_name - self._stable_api = False - super().__init__(default_collection=default_collection, seed=seed) + super().__init__(default_collection=default_collection, seed=seed, client_provided_by_user=client_provided, stable_api=False) def _get_create_table_sql(self) -> str: """Generate SQL for creating the key-value entries table. @@ -237,6 +230,10 @@ def _setup(self) -> None: - Metadata queries without JSON deserialization - Native JSON column support for rich querying capabilities """ + # Register connection cleanup if we own the connection + if not self._client_provided_by_user: + self._exit_stack.callback(self._connection.close) + # Create the main table for storing key-value entries self._connection.execute(self._get_create_table_sql()) @@ -253,10 +250,6 @@ def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | Non Reconstructs the ManagedEntry from value column and metadata columns using the serialization adapter. """ - if self._is_closed: - msg = "Cannot operate on closed DuckDBStore" - raise RuntimeError(msg) - result = self._connection.execute(self._get_select_sql(), [collection, key]).fetchone() if result is None: @@ -283,10 +276,6 @@ def _put_managed_entry(self, *, key: str, collection: str, managed_entry: Manage Uses the serialization adapter to convert the ManagedEntry to the appropriate storage format. """ - if self._is_closed: - msg = "Cannot operate on closed DuckDBStore" - raise RuntimeError(msg) - # Ensure that the value is serializable to JSON _ = managed_entry.value_as_json @@ -302,29 +291,8 @@ def _put_managed_entry(self, *, key: str, collection: str, managed_entry: Manage @override def _delete_managed_entry(self, *, key: str, collection: str) -> bool: """Delete a managed entry by key from the specified collection.""" - if self._is_closed: - msg = "Cannot operate on closed DuckDBStore" - raise RuntimeError(msg) - result = self._connection.execute(self._get_delete_sql(), [collection, key]) # Check if any rows were deleted by counting returned rows deleted_rows = result.fetchall() return len(deleted_rows) > 0 - - @override - def _close(self) -> None: - """Close the DuckDB connection.""" - if not self._is_closed and self._owns_connection: - self._connection.close() - self._is_closed = True - - def __del__(self) -> None: - """Clean up the DuckDB connection on deletion.""" - try: - if not self._is_closed and self._owns_connection and hasattr(self, "_connection"): - self._connection.close() - self._is_closed = True - except Exception: # noqa: S110 - # Suppress errors during cleanup to avoid issues during interpreter shutdown - pass diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/elasticsearch/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/elasticsearch/store.py index cbd016cb..83b6c704 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/elasticsearch/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/elasticsearch/store.py @@ -187,7 +187,9 @@ def __init__( """Initialize the elasticsearch store. Args: - elasticsearch_client: The elasticsearch client to use. + elasticsearch_client: The elasticsearch client to use. If provided, the store will not + manage the client's lifecycle (will not close it). The caller is responsible for + managing the client's lifecycle. url: The url of the elasticsearch cluster. api_key: The api key to use. index_prefix: The index prefix to use. Collections will be prefixed with this prefix. @@ -199,6 +201,8 @@ def __init__( msg = "Either elasticsearch_client or url must be provided" raise ValueError(msg) + client_provided = elasticsearch_client is not None + if elasticsearch_client: self._client = elasticsearch_client elif url: @@ -222,10 +226,15 @@ def __init__( default_collection=default_collection, collection_sanitization_strategy=collection_sanitization_strategy, key_sanitization_strategy=key_sanitization_strategy, + client_provided_by_user=client_provided, ) @override def _setup(self) -> None: + # Register client cleanup if we own the client + if not self._client_provided_by_user: + self._exit_stack.callback(self._client.close) + cluster_info = self._client.options(ignore_status=404).info() self._is_serverless = cluster_info.get("version", {}).get("build_flavor") == "serverless" @@ -472,7 +481,3 @@ def _cull(self) -> None: _ = self._client.options(ignore_status=404).delete_by_query( index=f"{self._index_prefix}-*", body={"query": {"range": {"expires_at": {"lt": ms_epoch}}}} ) - - @override - def _close(self) -> None: - self._client.close() diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/memory/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/memory/store.py index 89575d86..08f9731b 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/memory/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/memory/store.py @@ -112,9 +112,7 @@ def __init__( self._cache = {} - self._stable_api = True - - super().__init__(default_collection=default_collection, seed=seed) + super().__init__(default_collection=default_collection, seed=seed, stable_api=True) @override def _setup(self) -> None: diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/mongodb/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/mongodb/store.py index a66c7178..b9b5d013 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/mongodb/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/mongodb/store.py @@ -11,7 +11,7 @@ from key_value.shared.utils.sanitization import HybridSanitizationStrategy, SanitizationStrategy from key_value.shared.utils.sanitize import ALPHANUMERIC_CHARACTERS from key_value.shared.utils.serialization import SerializationAdapter -from typing_extensions import Self, override +from typing_extensions import override from key_value.sync.code_gen.stores.base import BaseContextManagerStore, BaseDestroyCollectionStore, BaseStore @@ -157,7 +157,9 @@ def __init__( Values are stored as native BSON dictionaries for better query support and performance. Args: - client: The MongoDB client to use (mutually exclusive with url). + client: The MongoDB client to use (mutually exclusive with url). If provided, the store + will not manage the client's lifecycle (will not enter/exit its context manager or + close it). The caller is responsible for managing the client's lifecycle. url: The url of the MongoDB cluster (mutually exclusive with client). db_name: The name of the MongoDB database. coll_name: The name of the MongoDB collection. @@ -165,6 +167,8 @@ def __init__( collection_sanitization_strategy: The sanitization strategy to use for collections. """ + client_provided = client is not None + if client: self._client = client elif url: @@ -180,18 +184,17 @@ def __init__( self._collections_by_name = {} self._adapter = MongoDBSerializationAdapter() - super().__init__(default_collection=default_collection, collection_sanitization_strategy=collection_sanitization_strategy) - - @override - def __enter__(self) -> Self: - _ = self._client.__enter__() - super().__enter__() - return self + super().__init__( + default_collection=default_collection, + collection_sanitization_strategy=collection_sanitization_strategy, + client_provided_by_user=client_provided, + ) @override - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: # pyright: ignore[reportAny] - super().__exit__(exc_type, exc_val, exc_tb) - self._client.__exit__(exc_type, exc_val, exc_tb) + def _setup(self) -> None: + """Register client cleanup if we own the client.""" + if not self._client_provided_by_user: + self._exit_stack.enter_context(self._client) @override def _setup_collection(self, *, collection: str) -> None: @@ -306,6 +309,5 @@ def _delete_collection(self, *, collection: str) -> bool: return True - @override - def _close(self) -> None: - self._client.close() + +# No need to override _close - the exit stack handles all cleanup automatically diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/redis/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/redis/store.py index 6885cc4e..58db4380 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/redis/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/redis/store.py @@ -57,7 +57,9 @@ def __init__( """Initialize the Redis store. Args: - client: An existing Redis client to use. + client: An existing Redis client to use. If provided, the store will not manage + the client's lifecycle (will not close it). The caller is responsible for + managing the client's lifecycle. url: Redis URL (e.g., redis://localhost:6379/0). host: Redis host. Defaults to localhost. port: Redis port. Defaults to 6379. @@ -65,6 +67,8 @@ def __init__( password: Redis password. Defaults to None. default_collection: The default collection to use if no collection is provided. """ + client_provided = client is not None + if client: self._client = client elif url: @@ -79,10 +83,9 @@ def __init__( else: self._client = Redis(host=host, port=port, db=db, password=password, decode_responses=True) - self._stable_api = True self._adapter = BasicSerializationAdapter(date_format="isoformat", value_format="dict") - super().__init__(default_collection=default_collection) + super().__init__(default_collection=default_collection, client_provided_by_user=client_provided, stable_api=True) @override def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None: @@ -203,9 +206,11 @@ def _get_collection_keys(self, *, collection: str, limit: int | None = None) -> return get_keys_from_compound_keys(compound_keys=keys, collection=collection) @override - def _delete_store(self) -> bool: - return self._client.flushdb() # pyright: ignore[reportUnknownMemberType, reportAny] + def _setup(self) -> None: + """Register client cleanup if we own the client.""" + if not self._client_provided_by_user: + self._exit_stack.callback(self._client.close) @override - def _close(self) -> None: - self._client.close() + def _delete_store(self) -> bool: + return self._client.flushdb() # pyright: ignore[reportUnknownMemberType, reportAny] diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/rocksdb/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/rocksdb/store.py index 5b1535eb..31f1bed3 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/rocksdb/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/rocksdb/store.py @@ -4,9 +4,8 @@ from collections.abc import Sequence from datetime import datetime from pathlib import Path -from typing import Any, overload +from typing import overload -from key_value.shared.errors.store import KeyValueStoreError from key_value.shared.utils.compound import compound_key from key_value.shared.utils.managed_entry import ManagedEntry from typing_extensions import override @@ -24,7 +23,6 @@ class RocksDBStore(BaseContextManagerStore, BaseStore): """A RocksDB-based key-value store.""" _db: Rdict - _is_closed: bool @overload def __init__(self, *, db: Rdict, default_collection: str | None = None) -> None: @@ -48,7 +46,9 @@ def __init__(self, *, db: Rdict | None = None, path: Path | str | None = None, d """Initialize the RocksDB store. Args: - db: An existing Rdict database instance to use. + db: An existing Rdict database instance to use. If provided, the store will NOT + manage its lifecycle (will not close it). The caller is responsible for managing + the database's lifecycle. path: The path to the RocksDB database directory. default_collection: The default collection to use if no collection is provided. """ @@ -60,6 +60,8 @@ def __init__(self, *, db: Rdict | None = None, path: Path | str | None = None, d msg = "Either db or path must be provided" raise ValueError(msg) + client_provided = db is not None + if db: self._db = db elif path: @@ -71,33 +73,21 @@ def __init__(self, *, db: Rdict | None = None, path: Path | str | None = None, d self._db = Rdict(str(path), options=opts) - self._is_closed = False - - super().__init__(default_collection=default_collection) + super().__init__(default_collection=default_collection, client_provided_by_user=client_provided) @override - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: # pyright: ignore[reportAny] - super().__exit__(exc_type, exc_val, exc_tb) - self._close() - - @override - def _close(self) -> None: - self._close_and_flush() + def _setup(self) -> None: + """Register database cleanup if we own the database.""" + if not self._client_provided_by_user: + # Register a callback to close and flush the database + self._exit_stack.callback(self._close_and_flush) def _close_and_flush(self) -> None: - if not self._is_closed: - self._db.flush() - self._db.close() - self._is_closed = True - - def _fail_on_closed_store(self) -> None: - if self._is_closed: - raise KeyValueStoreError(message="Operation attempted on closed store") + self._db.flush() + self._db.close() @override def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None: - self._fail_on_closed_store() - combo_key: str = compound_key(collection=collection, key=key) value: bytes | None = self._db.get(combo_key) @@ -112,8 +102,6 @@ def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | Non @override def _put_managed_entry(self, *, key: str, collection: str, managed_entry: ManagedEntry) -> None: - self._fail_on_closed_store() - combo_key: str = compound_key(collection=collection, key=key) json_value: str = self._serialization_adapter.dump_json(entry=managed_entry, key=key, collection=collection) @@ -130,8 +118,6 @@ def _put_managed_entries( created_at: datetime, expires_at: datetime | None, ) -> None: - self._fail_on_closed_store() - if not keys: return @@ -145,8 +131,6 @@ def _put_managed_entries( @override def _delete_managed_entry(self, *, key: str, collection: str) -> bool: - self._fail_on_closed_store() - combo_key: str = compound_key(collection=collection, key=key) # Check if key exists before deleting, this is only used for tracking deleted count @@ -158,8 +142,6 @@ def _delete_managed_entry(self, *, key: str, collection: str) -> bool: @override def _delete_managed_entries(self, *, keys: Sequence[str], collection: str) -> int: - self._fail_on_closed_store() - if not keys: return 0 @@ -180,6 +162,3 @@ def _delete_managed_entries(self, *, keys: Sequence[str], collection: str) -> in self._db.write(batch) return deleted_count - - def __del__(self) -> None: - self._close_and_flush() diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/valkey/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/valkey/store.py index 888029d0..e98153a9 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/valkey/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/valkey/store.py @@ -54,6 +54,21 @@ def __init__( username: str | None = None, password: str | None = None, ) -> None: + """Initialize the Valkey store. + + Args: + client: An existing Valkey client to use. If provided, the store will not manage + the client's lifecycle (will not close it). The caller is responsible for + managing the client's lifecycle. + default_collection: The default collection to use if no collection is provided. + host: Valkey host. Defaults to localhost. + port: Valkey port. Defaults to 6379. + db: Valkey database number. Defaults to 0. + username: Valkey username. Defaults to None. + password: Valkey password. Defaults to None. + """ + client_provided = client is not None + if client is not None: self._connected_client = client else: @@ -63,9 +78,7 @@ def __init__( self._client_config = GlideClientConfiguration(addresses=addresses, database_id=db, credentials=credentials) self._connected_client = None - self._stable_api = True - - super().__init__(default_collection=default_collection) + super().__init__(default_collection=default_collection, client_provided_by_user=client_provided, stable_api=True) @override def _setup(self) -> None: @@ -77,6 +90,10 @@ def _setup(self) -> None: self._connected_client = GlideClient.create(config=self._client_config) + # Register client cleanup if we own the client + if not self._client_provided_by_user: + self._exit_stack.callback(self._client.close) + @property def _client(self) -> BaseClient: if self._connected_client is None: @@ -143,9 +160,3 @@ def _delete_managed_entries(self, *, keys: Sequence[str], collection: str) -> in deleted_count: int = self._client.delete(keys=combo_keys) # pyright: ignore[reportArgumentType] return deleted_count - - @override - def _close(self) -> None: - if self._connected_client is None: - return - self._client.close() diff --git a/key-value/key-value-sync/tests/code_gen/stores/base.py b/key-value/key-value-sync/tests/code_gen/stores/base.py index 19dc108b..f39f9e5d 100644 --- a/key-value/key-value-sync/tests/code_gen/stores/base.py +++ b/key-value/key-value-sync/tests/code_gen/stores/base.py @@ -2,8 +2,11 @@ # from the original file 'base.py' # DO NOT CHANGE! Change the original file instead. import hashlib +import sys +import tempfile from abc import ABC, abstractmethod from collections.abc import Generator +from pathlib import Path from typing import Any import pytest @@ -23,6 +26,17 @@ class BaseStoreTests(ABC): def eventually_consistent(self) -> None: # noqa: B027 "Subclasses can override this to wait for eventually consistent operations." + @pytest.fixture + def per_test_temp_dir(self) -> Generator[Path, None, None]: + # ignore cleanup errors on Windows + if sys.platform == "win32": + ignore_cleanup_errors = True + else: + ignore_cleanup_errors = False + + with tempfile.TemporaryDirectory(ignore_cleanup_errors=ignore_cleanup_errors) as temp_dir: + yield Path(temp_dir) + @pytest.fixture @abstractmethod def store(self) -> BaseStore | Generator[BaseStore, None, None]: ... @@ -258,7 +272,7 @@ def test_minimum_put_many_delete_many_performance(self, store: BaseStore): class ContextManagerStoreTestMixin: @pytest.fixture(params=[True, False], ids=["with_ctx_manager", "no_ctx_manager"], autouse=True) def enter_exit_store( - self, request: pytest.FixtureRequest, store: BaseContextManagerStore + self, request: pytest.FixtureRequest, store: BaseContextManagerStore, per_test_temp_dir: Path ) -> Generator[BaseContextManagerStore, None, None]: context_manager = request.param # pyright: ignore[reportAny] diff --git a/key-value/key-value-sync/tests/code_gen/stores/disk/test_disk.py b/key-value/key-value-sync/tests/code_gen/stores/disk/test_disk.py index 990f8984..2c73597c 100644 --- a/key-value/key-value-sync/tests/code_gen/stores/disk/test_disk.py +++ b/key-value/key-value-sync/tests/code_gen/stores/disk/test_disk.py @@ -2,8 +2,7 @@ # from the original file 'test_disk.py' # DO NOT CHANGE! Change the original file instead. import json -import tempfile -from collections.abc import Generator +from pathlib import Path import pytest from dirty_equals import IsDatetime @@ -18,21 +17,19 @@ class TestDiskStore(ContextManagerStoreTestMixin, BaseStoreTests): - @pytest.fixture(scope="session") - def disk_store(self) -> Generator[DiskStore, None, None]: - with tempfile.TemporaryDirectory() as temp_dir: - yield DiskStore(directory=temp_dir, max_size=TEST_SIZE_LIMIT) - @override @pytest.fixture - def store(self, disk_store: DiskStore) -> DiskStore: + def store(self, per_test_temp_dir: Path) -> DiskStore: + disk_store = DiskStore(directory=per_test_temp_dir, max_size=TEST_SIZE_LIMIT) + disk_store._cache.clear() # pyright: ignore[reportPrivateUsage] return disk_store @pytest.fixture - def disk_cache(self, disk_store: DiskStore) -> Cache: - return disk_store._cache # pyright: ignore[reportPrivateUsage] + def disk_cache(self, store: DiskStore) -> Cache: + assert isinstance(store._cache, Cache) + return store._cache # pyright: ignore[reportPrivateUsage] def test_value_stored(self, store: DiskStore, disk_cache: Cache): store.put(collection="test", key="test_key", value={"name": "Alice", "age": 30}) diff --git a/key-value/key-value-sync/tests/code_gen/stores/disk/test_multi_disk.py b/key-value/key-value-sync/tests/code_gen/stores/disk/test_multi_disk.py index c61ea34d..6820301a 100644 --- a/key-value/key-value-sync/tests/code_gen/stores/disk/test_multi_disk.py +++ b/key-value/key-value-sync/tests/code_gen/stores/disk/test_multi_disk.py @@ -2,7 +2,6 @@ # from the original file 'test_multi_disk.py' # DO NOT CHANGE! Change the original file instead. import json -import tempfile from collections.abc import Generator from pathlib import Path from typing import TYPE_CHECKING @@ -22,18 +21,16 @@ class TestMultiDiskStore(ContextManagerStoreTestMixin, BaseStoreTests): - @pytest.fixture(scope="session") - def multi_disk_store(self) -> Generator[MultiDiskStore, None, None]: - with tempfile.TemporaryDirectory() as temp_dir: - yield MultiDiskStore(base_directory=Path(temp_dir), max_size=TEST_SIZE_LIMIT) - @override @pytest.fixture - def store(self, multi_disk_store: MultiDiskStore) -> MultiDiskStore: - for collection in multi_disk_store._cache: # pyright: ignore[reportPrivateUsage] - multi_disk_store._cache[collection].clear() # pyright: ignore[reportPrivateUsage] + def store(self, per_test_temp_dir: Path) -> Generator[MultiDiskStore, None, None]: + store = MultiDiskStore(base_directory=per_test_temp_dir, max_size=TEST_SIZE_LIMIT) + + yield store - return multi_disk_store + # Wipe the store after returning it + for collection in store._cache: # pyright: ignore[reportPrivateUsage] + store._cache[collection].clear() # pyright: ignore[reportPrivateUsage] def test_value_stored(self, store: MultiDiskStore): store.put(collection="test", key="test_key", value={"name": "Alice", "age": 30}) diff --git a/key-value/key-value-sync/tests/code_gen/stores/duckdb/test_duckdb.py b/key-value/key-value-sync/tests/code_gen/stores/duckdb/test_duckdb.py index ed88d3ec..51f5e54a 100644 --- a/key-value/key-value-sync/tests/code_gen/stores/duckdb/test_duckdb.py +++ b/key-value/key-value-sync/tests/code_gen/stores/duckdb/test_duckdb.py @@ -37,13 +37,13 @@ def test_not_unbounded(self, store: BaseStore): ... class TestDuckDBStorePersistent(ContextManagerStoreTestMixin, BaseStoreTests): @override @pytest.fixture - def store(self) -> Generator[DuckDBStore, None, None]: + def store(self, per_test_temp_dir: Path) -> Generator[DuckDBStore, None, None]: """Test with persistent DuckDB database file.""" - with TemporaryDirectory() as temp_dir: - db_path = Path(temp_dir) / "test.db" - duckdb_store = DuckDBStore(database_path=db_path) - yield duckdb_store - duckdb_store.close() + duckdb_store = DuckDBStore(database_path=per_test_temp_dir / "test.db") + + yield duckdb_store + + duckdb_store.close() @pytest.mark.skip(reason="Local disk stores are unbounded") def test_not_unbounded(self, store: BaseStore): ... diff --git a/key-value/key-value-sync/tests/code_gen/stores/rocksdb/test_rocksdb.py b/key-value/key-value-sync/tests/code_gen/stores/rocksdb/test_rocksdb.py index 95c2e90f..fd410bb8 100644 --- a/key-value/key-value-sync/tests/code_gen/stores/rocksdb/test_rocksdb.py +++ b/key-value/key-value-sync/tests/code_gen/stores/rocksdb/test_rocksdb.py @@ -2,7 +2,6 @@ # from the original file 'test_rocksdb.py' # DO NOT CHANGE! Change the original file instead. import json -from collections.abc import Generator from pathlib import Path from tempfile import TemporaryDirectory @@ -21,13 +20,8 @@ class TestRocksDBStore(ContextManagerStoreTestMixin, BaseStoreTests): @override @pytest.fixture - def store(self) -> Generator[RocksDBStore, None, None]: - """Create a RocksDB store for testing.""" - # Create a temporary directory for the RocksDB database - with TemporaryDirectory() as temp_dir: - db_path = Path(temp_dir) / "test_db" - rocksdb_store = RocksDBStore(path=db_path) - yield rocksdb_store + def store(self, per_test_temp_dir: Path) -> RocksDBStore: + return RocksDBStore(path=per_test_temp_dir / "test_db") def test_rocksdb_path_connection(self): """Test RocksDB store creation with path.""" @@ -62,6 +56,8 @@ def test_rocksdb_db_connection(self): assert result == {"test": "value"} store.close() + # Close the user-provided database before cleanup + db.close() temp_dir.cleanup() @pytest.mark.skip(reason="Local disk stores are unbounded") diff --git a/scripts/build_sync_library.py b/scripts/build_sync_library.py index e1f6d9a4..a054f4ac 100644 --- a/scripts/build_sync_library.py +++ b/scripts/build_sync_library.py @@ -224,6 +224,7 @@ class RenameAsyncToSync(ast.NodeTransformer): # type: ignore "AsyncDatabase": "Database", "AsyncCollection": "Collection", "AsyncMongoClient": "MongoClient", + "AsyncExitStack": "ExitStack", "redis.asyncio": "redis", "redis.asyncio.client": "redis.client", "glide_shared.config": "glide_sync.config", @@ -238,12 +239,14 @@ class RenameAsyncToSync(ast.NodeTransformer): # type: ignore "AsyncKeyValue": "KeyValue", "AsyncGenerator": "Generator", "aclose": "close", + "enter_async_context": "enter_context", "asyncio.sleep": "time.sleep", "async_running_in_event_loop": "running_in_event_loop", "asleep": "sleep", "async_wait_for_true": "wait_for_true", "async_retry_operation": "retry_operation", "async_gather": "gather", + "push_async_callback": "callback", } _skip_imports: ClassVar[dict[str, set[str]]] = { "acompat": {"alist", "anext"},