Skip to content

Commit 019bd59

Browse files
Add Aerospike store implementation
- Add AerospikeStore with full protocol compliance - Add native TTL support using Aerospike's meta parameter - Add Docker-based test suite - Update README.md to include Aerospike in backend list - Add aerospike>=14.0.0 as optional dependency - Generate sync version of Aerospike store Co-authored-by: William Easton <[email protected]>
1 parent 3b3def3 commit 019bd59

File tree

12 files changed

+569
-4
lines changed

12 files changed

+569
-4
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ This monorepo contains two libraries:
1717

1818
## Why use this library?
1919

20-
- **Multiple backends**: DynamoDB, Elasticsearch, Memcached, MongoDB, Redis,
21-
RocksDB, Valkey, and In-memory, Disk, etc
20+
- **Multiple backends**: Aerospike, DynamoDB, Elasticsearch, Memcached, MongoDB,
21+
Redis, RocksDB, Valkey, and In-memory, Disk, etc
2222
- **TTL support**: Automatic expiration handling across all store types
2323
- **Type-safe**: Full type hints with Protocol-based interfaces
2424
- **Adapters**: Pydantic model support, raise-on-missing behavior, etc
@@ -132,7 +132,7 @@ pip install py-key-value-aio[memory]
132132
pip install py-key-value-aio[disk]
133133
pip install py-key-value-aio[dynamodb]
134134
pip install py-key-value-aio[elasticsearch]
135-
# or: redis, mongodb, memcached, valkey, vault, registry, rocksdb, see below for all options
135+
# or: aerospike, redis, mongodb, memcached, valkey, vault, registry, rocksdb, see below for all options
136136
```
137137

138138
```python

key-value/key-value-aio/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ dynamodb = ["aioboto3>=13.3.0", "types-aiobotocore-dynamodb>=2.16.0"]
4545
keyring = ["keyring>=25.6.0"]
4646
keyring-linux = ["keyring>=25.6.0", "dbus-python>=1.4.0"]
4747
pydantic = ["pydantic>=2.11.9"]
48+
aerospike = ["aerospike>=14.0.0"]
4849
rocksdb = [
4950
"rocksdict>=0.3.24 ; python_version >= '3.12'", # RocksDB 0.3.24 is the first version to support Python 3.13
5051
"rocksdict>=0.3.2 ; python_version < '3.12'"
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from key_value.aio.stores.aerospike.store import AerospikeStore
2+
3+
__all__ = ["AerospikeStore"]
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
from typing import Any, overload
2+
3+
from key_value.shared.errors import DeserializationError
4+
from key_value.shared.type_checking.bear_spray import bear_spray
5+
from key_value.shared.utils.compound import compound_key, get_keys_from_compound_keys
6+
from key_value.shared.utils.managed_entry import ManagedEntry
7+
from key_value.shared.utils.serialization import BasicSerializationAdapter, SerializationAdapter
8+
from typing_extensions import override
9+
10+
from key_value.aio.stores.base import BaseContextManagerStore, BaseDestroyStore, BaseEnumerateKeysStore, BaseStore
11+
12+
try:
13+
import aerospike
14+
except ImportError as e:
15+
msg = "AerospikeStore requires py-key-value-aio[aerospike]"
16+
raise ImportError(msg) from e
17+
18+
DEFAULT_NAMESPACE = "test"
19+
DEFAULT_SET = "kv-store"
20+
DEFAULT_PAGE_SIZE = 10000
21+
PAGE_LIMIT = 10000
22+
23+
24+
class AerospikeStore(BaseDestroyStore, BaseEnumerateKeysStore, BaseContextManagerStore, BaseStore):
25+
"""Aerospike-based key-value store."""
26+
27+
_client: aerospike.Client
28+
_namespace: str
29+
_set: str
30+
_adapter: SerializationAdapter
31+
32+
@overload
33+
def __init__(
34+
self,
35+
*,
36+
client: aerospike.Client,
37+
namespace: str = DEFAULT_NAMESPACE,
38+
set_name: str = DEFAULT_SET,
39+
default_collection: str | None = None,
40+
) -> None: ...
41+
42+
@overload
43+
def __init__(
44+
self,
45+
*,
46+
hosts: list[tuple[str, int]] | None = None,
47+
namespace: str = DEFAULT_NAMESPACE,
48+
set_name: str = DEFAULT_SET,
49+
default_collection: str | None = None,
50+
) -> None: ...
51+
52+
@bear_spray
53+
def __init__(
54+
self,
55+
*,
56+
client: aerospike.Client | None = None,
57+
hosts: list[tuple[str, int]] | None = None,
58+
namespace: str = DEFAULT_NAMESPACE,
59+
set_name: str = DEFAULT_SET,
60+
default_collection: str | None = None,
61+
) -> None:
62+
"""Initialize the Aerospike store.
63+
64+
Args:
65+
client: An existing Aerospike client to use.
66+
hosts: List of (host, port) tuples. Defaults to [("localhost", 3000)].
67+
namespace: Aerospike namespace. Defaults to "test".
68+
set_name: Aerospike set. Defaults to "kv-store".
69+
default_collection: The default collection to use if no collection is provided.
70+
"""
71+
if client:
72+
self._client = client
73+
else:
74+
hosts = hosts or [("localhost", 3000)]
75+
config = {"hosts": hosts}
76+
self._client = aerospike.client(config)
77+
78+
self._namespace = namespace
79+
self._set = set_name
80+
81+
self._stable_api = True
82+
self._adapter = BasicSerializationAdapter(date_format="isoformat", value_format="dict")
83+
84+
super().__init__(default_collection=default_collection)
85+
86+
@override
87+
async def _setup(self) -> None:
88+
"""Connect to Aerospike."""
89+
self._client.connect()
90+
91+
@override
92+
async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None:
93+
combo_key: str = compound_key(collection=collection, key=key)
94+
95+
aerospike_key = (self._namespace, self._set, combo_key)
96+
97+
try:
98+
(_key, _metadata, bins) = self._client.get(aerospike_key)
99+
except aerospike.exception.RecordNotFound:
100+
return None
101+
102+
json_value: str | None = bins.get("value")
103+
104+
if not isinstance(json_value, str):
105+
return None
106+
107+
try:
108+
return self._adapter.load_json(json_str=json_value)
109+
except DeserializationError:
110+
return None
111+
112+
@override
113+
async def _put_managed_entry(
114+
self,
115+
*,
116+
key: str,
117+
collection: str,
118+
managed_entry: ManagedEntry,
119+
) -> None:
120+
combo_key: str = compound_key(collection=collection, key=key)
121+
122+
aerospike_key = (self._namespace, self._set, combo_key)
123+
json_value: str = self._adapter.dump_json(entry=managed_entry, key=key, collection=collection)
124+
125+
bins = {"value": json_value}
126+
127+
meta = {}
128+
if managed_entry.ttl is not None:
129+
# Aerospike TTL is in seconds
130+
meta["ttl"] = int(managed_entry.ttl)
131+
132+
self._client.put(aerospike_key, bins, meta=meta)
133+
134+
@override
135+
async def _delete_managed_entry(self, *, key: str, collection: str) -> bool:
136+
combo_key: str = compound_key(collection=collection, key=key)
137+
138+
aerospike_key = (self._namespace, self._set, combo_key)
139+
140+
try:
141+
self._client.remove(aerospike_key)
142+
except aerospike.exception.RecordNotFound:
143+
return False
144+
else:
145+
return True
146+
147+
@override
148+
async def _get_collection_keys(self, *, collection: str, limit: int | None = None) -> list[str]:
149+
limit = min(limit or DEFAULT_PAGE_SIZE, PAGE_LIMIT)
150+
151+
pattern = compound_key(collection=collection, key="")
152+
153+
keys: list[str] = []
154+
155+
def callback(record: tuple[Any, Any, Any]) -> None: # pyright: ignore[reportAny]
156+
(_namespace, _set, primary_key, _bins) = record # pyright: ignore[reportAny]
157+
if isinstance(primary_key, str) and primary_key.startswith(pattern):
158+
keys.append(primary_key)
159+
160+
# Scan the set for keys matching the collection
161+
scan = self._client.scan(self._namespace, self._set)
162+
scan.foreach(callback)
163+
164+
# Extract just the key part from compound keys
165+
result_keys = get_keys_from_compound_keys(compound_keys=keys, collection=collection)
166+
167+
return result_keys[:limit]
168+
169+
@override
170+
async def _delete_store(self) -> bool:
171+
"""Truncate the set (delete all records in the set)."""
172+
# Aerospike truncate requires a timestamp parameter
173+
# Using 0 means truncate everything
174+
self._client.truncate(self._namespace, self._set, 0)
175+
return True
176+
177+
@override
178+
async def _close(self) -> None:
179+
"""Close the Aerospike connection."""
180+
self._client.close()

key-value/key-value-aio/tests/stores/aerospike/__init__.py

Whitespace-only changes.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import contextlib
2+
from collections.abc import AsyncGenerator
3+
4+
import pytest
5+
from key_value.shared.stores.wait import async_wait_for_true
6+
from typing_extensions import override
7+
8+
from key_value.aio.stores.aerospike import AerospikeStore
9+
from key_value.aio.stores.base import BaseStore
10+
from tests.conftest import docker_container, should_skip_docker_tests
11+
from tests.stores.base import BaseStoreTests, ContextManagerStoreTestMixin
12+
13+
# Aerospike test configuration
14+
AEROSPIKE_HOST = "localhost"
15+
AEROSPIKE_PORT = 3000
16+
AEROSPIKE_NAMESPACE = "test"
17+
AEROSPIKE_SET = "kv-store-adapter-tests"
18+
19+
WAIT_FOR_AEROSPIKE_TIMEOUT = 30
20+
21+
22+
async def ping_aerospike() -> bool:
23+
try:
24+
import aerospike
25+
26+
config = {"hosts": [(AEROSPIKE_HOST, AEROSPIKE_PORT)]}
27+
client = aerospike.client(config)
28+
client.connect()
29+
client.close()
30+
except Exception:
31+
return False
32+
else:
33+
return True
34+
35+
36+
class AerospikeFailedToStartError(Exception):
37+
pass
38+
39+
40+
@pytest.mark.skipif(should_skip_docker_tests(), reason="Docker is not available")
41+
class TestAerospikeStore(ContextManagerStoreTestMixin, BaseStoreTests):
42+
@pytest.fixture(autouse=True, scope="session")
43+
async def setup_aerospike(self) -> AsyncGenerator[None, None]:
44+
with docker_container("aerospike-test", "aerospike/aerospike-server:latest", {"3000": 3000}):
45+
if not await async_wait_for_true(bool_fn=ping_aerospike, tries=30, wait_time=1):
46+
msg = "Aerospike failed to start"
47+
raise AerospikeFailedToStartError(msg)
48+
49+
yield
50+
51+
@override
52+
@pytest.fixture
53+
async def store(self, setup_aerospike: None) -> AerospikeStore:
54+
import aerospike
55+
56+
config = {"hosts": [(AEROSPIKE_HOST, AEROSPIKE_PORT)]}
57+
client = aerospike.client(config)
58+
client.connect()
59+
60+
store = AerospikeStore(client=client, namespace=AEROSPIKE_NAMESPACE, set_name=AEROSPIKE_SET)
61+
62+
# Clean up the set before tests
63+
with contextlib.suppress(Exception):
64+
client.truncate(AEROSPIKE_NAMESPACE, AEROSPIKE_SET, 0)
65+
66+
return store
67+
68+
@pytest.fixture
69+
async def aerospike_store(self, store: AerospikeStore) -> AerospikeStore:
70+
return store
71+
72+
@pytest.mark.skip(reason="Distributed Caches are unbounded")
73+
@override
74+
async def test_not_unbounded(self, store: BaseStore): ...
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# WARNING: this file is auto-generated by 'build_sync_library.py'
2+
# from the original file '__init__.py'
3+
# DO NOT CHANGE! Change the original file instead.
4+
from key_value.sync.code_gen.stores.aerospike.store import AerospikeStore
5+
6+
__all__ = ["AerospikeStore"]

0 commit comments

Comments
 (0)