Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ This monorepo contains two libraries:

## Why use this library?

- **Multiple backends**: DynamoDB, Elasticsearch, Memcached, MongoDB, Redis,
RocksDB, Valkey, and In-memory, Disk, etc
- **Multiple backends**: Aerospike, DynamoDB, Elasticsearch, Memcached, MongoDB,
Redis, RocksDB, Valkey, and In-memory, Disk, etc
- **TTL support**: Automatic expiration handling across all store types
- **Type-safe**: Full type hints with Protocol-based interfaces
- **Adapters**: Pydantic model support, raise-on-missing behavior, etc
Expand Down Expand Up @@ -132,7 +132,7 @@ pip install py-key-value-aio[memory]
pip install py-key-value-aio[disk]
pip install py-key-value-aio[dynamodb]
pip install py-key-value-aio[elasticsearch]
# or: redis, mongodb, memcached, valkey, vault, registry, rocksdb, see below for all options
# or: aerospike, redis, mongodb, memcached, valkey, vault, registry, rocksdb, see below for all options
```

```python
Expand Down
1 change: 1 addition & 0 deletions key-value/key-value-aio/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dynamodb = ["aioboto3>=13.3.0", "types-aiobotocore-dynamodb>=2.16.0"]
keyring = ["keyring>=25.6.0"]
keyring-linux = ["keyring>=25.6.0", "dbus-python>=1.4.0"]
pydantic = ["pydantic>=2.11.9"]
aerospike = ["aerospike>=14.0.0"]
rocksdb = [
"rocksdict>=0.3.24 ; python_version >= '3.12'", # RocksDB 0.3.24 is the first version to support Python 3.13
"rocksdict>=0.3.2 ; python_version < '3.12'"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from key_value.aio.stores.aerospike.store import AerospikeStore

__all__ = ["AerospikeStore"]
228 changes: 228 additions & 0 deletions key-value/key-value-aio/src/key_value/aio/stores/aerospike/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
from typing import Any, overload

from key_value.shared.errors import DeserializationError
from key_value.shared.type_checking.bear_spray import bear_spray
from key_value.shared.utils.compound import compound_key, get_keys_from_compound_keys
from key_value.shared.utils.managed_entry import ManagedEntry
from key_value.shared.utils.serialization import BasicSerializationAdapter, SerializationAdapter
from typing_extensions import override

from key_value.aio.stores.base import BaseContextManagerStore, BaseDestroyStore, BaseEnumerateKeysStore, BaseStore

try:
import aerospike # pyright: ignore[reportMissingImports]
except ImportError as e:
msg = "AerospikeStore requires py-key-value-aio[aerospike]"
raise ImportError(msg) from e

DEFAULT_NAMESPACE = "test"
DEFAULT_SET = "kv-store"
DEFAULT_PAGE_SIZE = 10000
PAGE_LIMIT = 10000


class AerospikeStore(BaseDestroyStore, BaseEnumerateKeysStore, BaseContextManagerStore, BaseStore):
"""Aerospike-based key-value store."""

_client: aerospike.Client # pyright: ignore[reportUnknownMemberType]
_namespace: str
_set: str
_adapter: SerializationAdapter

@overload
def __init__(
self,
*,
client: aerospike.Client, # pyright: ignore[reportUnknownMemberType, reportUnknownParameterType]
namespace: str = DEFAULT_NAMESPACE,
set_name: str = DEFAULT_SET,
default_collection: str | None = None,
) -> None: ...

@overload
def __init__(
self,
*,
hosts: list[tuple[str, int]] | None = None,
namespace: str = DEFAULT_NAMESPACE,
set_name: str = DEFAULT_SET,
default_collection: str | None = None,
) -> None: ...

@bear_spray
def __init__(
self,
*,
client: aerospike.Client | None = None, # pyright: ignore[reportUnknownMemberType, reportUnknownParameterType]
hosts: list[tuple[str, int]] | None = None,
namespace: str = DEFAULT_NAMESPACE,
set_name: str = DEFAULT_SET,
default_collection: str | None = None,
) -> None:
"""Initialize the Aerospike store.

Args:
client: An existing Aerospike client to use.
hosts: List of (host, port) tuples. Defaults to [("localhost", 3000)].
namespace: Aerospike namespace. Defaults to "test".
set_name: Aerospike set. Defaults to "kv-store".
default_collection: The default collection to use if no collection is provided.
"""
if client:
self._client = client
else:
hosts = hosts or [("localhost", 3000)]
config = {"hosts": hosts}
self._client = self._create_client(config) # pyright: ignore[reportUnknownMemberType]

self._namespace = namespace
self._set = set_name

self._stable_api = True
self._adapter = BasicSerializationAdapter(date_format="isoformat", value_format="dict")

super().__init__(default_collection=default_collection)

# Helper methods to encapsulate aerospike client interactions with type ignore comments

def _create_client(self, config: dict[str, Any]) -> aerospike.Client: # pyright: ignore[reportUnknownMemberType, reportUnknownParameterType]
"""Create an Aerospike client."""
return aerospike.client(config) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]

def _connect_client(self) -> None:
"""Connect the Aerospike client."""
self._client.connect() # pyright: ignore[reportUnknownMemberType]

def _get_record(self, aerospike_key: tuple[str, str, str]) -> tuple[Any, Any, dict[str, Any]] | None:
"""Get a record from Aerospike.

Returns:
Tuple of (key, metadata, bins) or None if not found.
"""
try:
return self._client.get(aerospike_key) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
except aerospike.exception.RecordNotFound: # pyright: ignore[reportUnknownMemberType]
return None

def _put_record(self, aerospike_key: tuple[str, str, str], bins: dict[str, Any], meta: dict[str, Any] | None = None) -> None:
"""Put a record into Aerospike."""
if meta:
self._client.put(aerospike_key, bins, meta=meta) # pyright: ignore[reportUnknownMemberType]
else:
self._client.put(aerospike_key, bins) # pyright: ignore[reportUnknownMemberType]

def _remove_record(self, aerospike_key: tuple[str, str, str]) -> bool:
"""Remove a record from Aerospike.

Returns:
True if the record was deleted, False if it didn't exist.
"""
try:
self._client.remove(aerospike_key) # pyright: ignore[reportUnknownMemberType]
except aerospike.exception.RecordNotFound: # pyright: ignore[reportUnknownMemberType]
return False
else:
return True

def _scan_set(self, callback: Any) -> None: # pyright: ignore[reportAny]
"""Scan the entire set with a callback function."""
scan = self._client.scan(self._namespace, self._set) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
scan.foreach(callback) # pyright: ignore[reportUnknownMemberType]

def _truncate_set(self) -> None:
"""Truncate the set (delete all records)."""
self._client.truncate(self._namespace, self._set, 0) # pyright: ignore[reportUnknownMemberType]

def _close_client(self) -> None:
"""Close the Aerospike client connection."""
self._client.close() # pyright: ignore[reportUnknownMemberType]

@override
async def _setup(self) -> None:
"""Connect to Aerospike."""
self._connect_client()

@override
async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None:
combo_key: str = compound_key(collection=collection, key=key)
aerospike_key = (self._namespace, self._set, combo_key)

record = self._get_record(aerospike_key)
if record is None:
return None

(_key, _metadata, bins) = record
json_value: str | None = bins.get("value")

if not isinstance(json_value, str):
return None

try:
return self._adapter.load_json(json_str=json_value)
except DeserializationError:
return None

@override
async def _put_managed_entry(
self,
*,
key: str,
collection: str,
managed_entry: ManagedEntry,
) -> None:
combo_key: str = compound_key(collection=collection, key=key)
aerospike_key = (self._namespace, self._set, combo_key)
json_value: str = self._adapter.dump_json(entry=managed_entry, key=key, collection=collection)

bins = {"value": json_value}

meta = None
if managed_entry.ttl is not None:
# Aerospike TTL is in seconds
meta = {"ttl": int(managed_entry.ttl)}

self._put_record(aerospike_key, bins, meta=meta)

@override
async def _delete_managed_entry(self, *, key: str, collection: str) -> bool:
combo_key: str = compound_key(collection=collection, key=key)
aerospike_key = (self._namespace, self._set, combo_key)

return self._remove_record(aerospike_key)

@override
async def _get_collection_keys(self, *, collection: str, limit: int | None = None) -> list[str]:
limit = min(limit or DEFAULT_PAGE_SIZE, PAGE_LIMIT)

pattern = compound_key(collection=collection, key="")

keys: list[str] = []

def callback(record: tuple[Any, Any, Any]) -> None: # pyright: ignore[reportAny]
# Aerospike scan callback receives a 3-tuple: (key_tuple, metadata, bins)
# The key_tuple itself is (namespace, set, primary_key)
(key_tuple, _metadata, _bins) = record # pyright: ignore[reportAny]
primary_key = key_tuple[2] # Extract primary_key from the key_tuple
if isinstance(primary_key, str) and primary_key.startswith(pattern):
keys.append(primary_key)

# Scan the set for keys matching the collection
self._scan_set(callback)

# Extract just the key part from compound keys
result_keys = get_keys_from_compound_keys(compound_keys=keys, collection=collection)

return result_keys[:limit]

@override
async def _delete_store(self) -> bool:
"""Truncate the set (delete all records in the set)."""
# Aerospike truncate requires a timestamp parameter
# Using 0 means truncate everything
self._truncate_set()
return True

@override
async def _close(self) -> None:
"""Close the Aerospike connection."""
self._close_client()
Empty file.
74 changes: 74 additions & 0 deletions key-value/key-value-aio/tests/stores/aerospike/test_aerospike.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import contextlib
from collections.abc import AsyncGenerator

import pytest
from key_value.shared.stores.wait import async_wait_for_true
from typing_extensions import override

from key_value.aio.stores.aerospike import AerospikeStore
from key_value.aio.stores.base import BaseStore
from tests.conftest import docker_container, should_skip_docker_tests
from tests.stores.base import BaseStoreTests, ContextManagerStoreTestMixin

# Aerospike test configuration
AEROSPIKE_HOST = "localhost"
AEROSPIKE_PORT = 3000
AEROSPIKE_NAMESPACE = "test"
AEROSPIKE_SET = "kv-store-adapter-tests"

WAIT_FOR_AEROSPIKE_TIMEOUT = 30


async def ping_aerospike() -> bool:
try:
import aerospike # pyright: ignore[reportMissingImports]

config = {"hosts": [(AEROSPIKE_HOST, AEROSPIKE_PORT)]}
client = aerospike.client(config) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType, reportAttributeAccessIssue]
client.connect() # pyright: ignore[reportUnknownMemberType]
client.close() # pyright: ignore[reportUnknownMemberType]
except Exception:
return False
else:
return True


class AerospikeFailedToStartError(Exception):
pass


@pytest.mark.skipif(should_skip_docker_tests(), reason="Docker is not available")
class TestAerospikeStore(ContextManagerStoreTestMixin, BaseStoreTests):
@pytest.fixture(autouse=True, scope="session")
async def setup_aerospike(self) -> AsyncGenerator[None, None]:
with docker_container("aerospike-test", "aerospike/aerospike-server:latest", {"3000": 3000}):
if not await async_wait_for_true(bool_fn=ping_aerospike, tries=30, wait_time=1):
msg = "Aerospike failed to start"
raise AerospikeFailedToStartError(msg)

yield

@override
@pytest.fixture
async def store(self, setup_aerospike: None) -> AerospikeStore:
import aerospike # pyright: ignore[reportMissingImports]

config = {"hosts": [(AEROSPIKE_HOST, AEROSPIKE_PORT)]}
client = aerospike.client(config) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType, reportAttributeAccessIssue]
client.connect() # pyright: ignore[reportUnknownMemberType]

store = AerospikeStore(client=client, namespace=AEROSPIKE_NAMESPACE, set_name=AEROSPIKE_SET) # pyright: ignore[reportUnknownArgumentType]

# Clean up the set before tests
with contextlib.suppress(Exception):
client.truncate(AEROSPIKE_NAMESPACE, AEROSPIKE_SET, 0) # pyright: ignore[reportUnknownMemberType]

return store

@pytest.fixture
async def aerospike_store(self, store: AerospikeStore) -> AerospikeStore:
return store

@pytest.mark.skip(reason="Distributed Caches are unbounded")
@override
async def test_not_unbounded(self, store: BaseStore): ...
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# WARNING: this file is auto-generated by 'build_sync_library.py'
# from the original file '__init__.py'
# DO NOT CHANGE! Change the original file instead.
from key_value.sync.code_gen.stores.aerospike.store import AerospikeStore

__all__ = ["AerospikeStore"]
Loading