Skip to content

Commit d22a52d

Browse files
Refactor ManagedEntry serialization with adapter pattern
This commit implements a comprehensive refactoring of ManagedEntry serialization to address inconsistencies across stores and wrappers. Changes: - Created SerializationAdapter base class with three implementations: - FullJsonAdapter: For Redis/Valkey (full JSON string) - StringifiedDictAdapter: For MongoDB (dict with stringified value) - ElasticsearchAdapter: For Elasticsearch (native or stringified) - Updated stores to use serialization adapters: - MongoDB: Uses StringifiedDictAdapter - Elasticsearch: Uses ElasticsearchAdapter with native_storage flag - Redis: Uses FullJsonAdapter - Fixed wrapper size calculation inefficiency: - Added estimate_serialized_size() utility function - Updated CompressionWrapper and LimitSizeWrapper to use utility - Eliminates unnecessary ManagedEntry instantiation - Removed deprecated helper functions: - Removed managed_entry_to_document/document_to_managed_entry from MongoDB - Removed managed_entry_to_document/source_to_managed_entry from Elasticsearch - Removed managed_entry_to_json/json_to_managed_entry from Redis - Updated test files to use new adapter-based approach Benefits: - Consistent serialization interface across all stores - Separation of concerns (serialization logic in adapters) - Easier to add new stores or modify serialization - More efficient size calculations in wrappers - Better testability and maintainability Related to issue #165 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: William Easton <[email protected]>
1 parent 23f4a9c commit d22a52d

File tree

18 files changed

+333
-300
lines changed

18 files changed

+333
-300
lines changed

key-value/key-value-aio/src/key_value/aio/stores/elasticsearch/store.py

Lines changed: 18 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@
66
from elastic_transport import ObjectApiResponse
77
from elastic_transport import SerializationError as ElasticsearchSerializationError
88
from key_value.shared.errors import DeserializationError, SerializationError
9-
from key_value.shared.utils.managed_entry import ManagedEntry, load_from_json, verify_dict
9+
from key_value.shared.utils.managed_entry import ManagedEntry
1010
from key_value.shared.utils.sanitize import (
1111
ALPHANUMERIC_CHARACTERS,
1212
LOWERCASE_ALPHABET,
1313
NUMBERS,
1414
sanitize_string,
1515
)
16-
from key_value.shared.utils.time_to_live import now_as_epoch, try_parse_datetime_str
16+
from key_value.shared.utils.serialization import ElasticsearchAdapter, SerializationAdapter
17+
from key_value.shared.utils.time_to_live import now_as_epoch
1718
from typing_extensions import override
1819

1920
from key_value.aio.stores.base import (
@@ -84,54 +85,6 @@
8485
ALLOWED_INDEX_CHARACTERS: str = LOWERCASE_ALPHABET + NUMBERS + "_" + "-" + "."
8586

8687

87-
def managed_entry_to_document(collection: str, key: str, managed_entry: ManagedEntry, *, native_storage: bool = False) -> dict[str, Any]:
88-
document: dict[str, Any] = {"collection": collection, "key": key, "value": {}}
89-
90-
# Store in appropriate field based on mode
91-
if native_storage:
92-
document["value"]["flattened"] = managed_entry.value_as_dict
93-
else:
94-
document["value"]["string"] = managed_entry.value_as_json
95-
96-
if managed_entry.created_at:
97-
document["created_at"] = managed_entry.created_at.isoformat()
98-
if managed_entry.expires_at:
99-
document["expires_at"] = managed_entry.expires_at.isoformat()
100-
101-
return document
102-
103-
104-
def source_to_managed_entry(source: dict[str, Any]) -> ManagedEntry:
105-
value: dict[str, Any] = {}
106-
107-
raw_value = source.get("value")
108-
109-
# Try flattened field first, fall back to string field
110-
if not raw_value or not isinstance(raw_value, dict):
111-
msg = "Value field not found or invalid type"
112-
raise DeserializationError(msg)
113-
114-
if value_flattened := raw_value.get("flattened"): # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType]
115-
value = verify_dict(obj=value_flattened)
116-
elif value_str := raw_value.get("string"): # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType]
117-
if not isinstance(value_str, str):
118-
msg = "Value in `value` field is not a string"
119-
raise DeserializationError(msg)
120-
value = load_from_json(value_str)
121-
else:
122-
msg = "Value field not found or invalid type"
123-
raise DeserializationError(msg)
124-
125-
created_at: datetime | None = try_parse_datetime_str(value=source.get("created_at"))
126-
expires_at: datetime | None = try_parse_datetime_str(value=source.get("expires_at"))
127-
128-
return ManagedEntry(
129-
value=value,
130-
created_at=created_at,
131-
expires_at=expires_at,
132-
)
133-
134-
13588
class ElasticsearchStore(
13689
BaseEnumerateCollectionsStore, BaseEnumerateKeysStore, BaseDestroyCollectionStore, BaseCullStore, BaseContextManagerStore, BaseStore
13790
):
@@ -145,6 +98,8 @@ class ElasticsearchStore(
14598

14699
_native_storage: bool
147100

101+
_adapter: SerializationAdapter
102+
148103
@overload
149104
def __init__(
150105
self,
@@ -208,6 +163,7 @@ def __init__(
208163
self._index_prefix = index_prefix
209164
self._native_storage = native_storage
210165
self._is_serverless = False
166+
self._adapter = ElasticsearchAdapter(native_storage=native_storage)
211167

212168
super().__init__(default_collection=default_collection)
213169

@@ -260,7 +216,7 @@ async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry
260216
return None
261217

262218
try:
263-
return source_to_managed_entry(source=source)
219+
return self._adapter.from_storage(data=source)
264220
except DeserializationError:
265221
return None
266222

@@ -293,7 +249,7 @@ async def _get_managed_entries(self, *, collection: str, keys: Sequence[str]) ->
293249
continue
294250

295251
try:
296-
entries_by_id[doc_id] = source_to_managed_entry(source=source)
252+
entries_by_id[doc_id] = self._adapter.from_storage(data=source)
297253
except DeserializationError as e:
298254
logger.error(
299255
"Failed to deserialize Elasticsearch document in batch operation",
@@ -324,9 +280,11 @@ async def _put_managed_entry(
324280
index_name: str = self._sanitize_index_name(collection=collection)
325281
document_id: str = self._sanitize_document_id(key=key)
326282

327-
document: dict[str, Any] = managed_entry_to_document(
328-
collection=collection, key=key, managed_entry=managed_entry, native_storage=self._native_storage
329-
)
283+
document = self._adapter.to_storage(collection=collection, key=key, entry=managed_entry)
284+
285+
if not isinstance(document, dict):
286+
msg = "Elasticsearch adapter must return dict"
287+
raise TypeError(msg)
330288

331289
try:
332290
_ = await self._client.index(
@@ -364,9 +322,11 @@ async def _put_managed_entries(
364322

365323
index_action: dict[str, Any] = new_bulk_action(action="index", index=index_name, document_id=document_id)
366324

367-
document: dict[str, Any] = managed_entry_to_document(
368-
collection=collection, key=key, managed_entry=managed_entry, native_storage=self._native_storage
369-
)
325+
document = self._adapter.to_storage(collection=collection, key=key, entry=managed_entry)
326+
327+
if not isinstance(document, dict):
328+
msg = "Elasticsearch adapter must return dict"
329+
raise TypeError(msg)
370330

371331
operations.extend([index_action, document])
372332
try:

key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py

Lines changed: 15 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from key_value.shared.utils.managed_entry import ManagedEntry
66
from key_value.shared.utils.sanitize import ALPHANUMERIC_CHARACTERS, sanitize_string
7+
from key_value.shared.utils.serialization import SerializationAdapter, StringifiedDictAdapter
78
from typing_extensions import Self, override
89

910
from key_value.aio.stores.base import BaseContextManagerStore, BaseDestroyCollectionStore, BaseEnumerateCollectionsStore, BaseStore
@@ -33,48 +34,13 @@
3334
COLLECTION_ALLOWED_CHARACTERS = ALPHANUMERIC_CHARACTERS + "_"
3435

3536

36-
def document_to_managed_entry(document: dict[str, Any]) -> ManagedEntry:
37-
"""Convert a MongoDB document back to a ManagedEntry.
38-
39-
This function deserializes a MongoDB document (created by `managed_entry_to_document`) back to a
40-
ManagedEntry object, parsing the stringified value field and preserving all metadata.
41-
42-
Args:
43-
document: The MongoDB document to convert.
44-
45-
Returns:
46-
A ManagedEntry object reconstructed from the document.
47-
"""
48-
return ManagedEntry.from_dict(data=document, stringified_value=True)
49-
50-
51-
def managed_entry_to_document(key: str, managed_entry: ManagedEntry) -> dict[str, Any]:
52-
"""Convert a ManagedEntry to a MongoDB document for storage.
53-
54-
This function serializes a ManagedEntry to a MongoDB document format, including the key and all
55-
metadata (TTL, creation, and expiration timestamps). The value is stringified to ensure proper
56-
storage in MongoDB. The serialization is designed to preserve all entry information for round-trip
57-
conversion back to a ManagedEntry.
58-
59-
Args:
60-
key: The key associated with this entry.
61-
managed_entry: The ManagedEntry to serialize.
62-
63-
Returns:
64-
A MongoDB document dict containing the key, value, and all metadata.
65-
"""
66-
return {
67-
"key": key,
68-
**managed_entry.to_dict(include_metadata=True, include_expiration=True, include_creation=True, stringify_value=True),
69-
}
70-
71-
7237
class MongoDBStore(BaseEnumerateCollectionsStore, BaseDestroyCollectionStore, BaseContextManagerStore, BaseStore):
7338
"""MongoDB-based key-value store using Motor (async MongoDB driver)."""
7439

7540
_client: AsyncMongoClient[dict[str, Any]]
7641
_db: AsyncDatabase[dict[str, Any]]
7742
_collections_by_name: dict[str, AsyncCollection[dict[str, Any]]]
43+
_adapter: SerializationAdapter
7844

7945
@overload
8046
def __init__(
@@ -131,6 +97,7 @@ def __init__(
13197

13298
self._db = self._client[db_name]
13399
self._collections_by_name = {}
100+
self._adapter = StringifiedDictAdapter()
134101

135102
super().__init__(default_collection=default_collection)
136103

@@ -187,7 +154,7 @@ async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry
187154
sanitized_collection = self._sanitize_collection_name(collection=collection)
188155

189156
if doc := await self._collections_by_name[sanitized_collection].find_one(filter={"key": key}):
190-
return ManagedEntry.from_dict(data=doc, stringified_value=True)
157+
return self._adapter.from_storage(data=doc)
191158

192159
return None
193160

@@ -205,7 +172,7 @@ async def _get_managed_entries(self, *, collection: str, keys: Sequence[str]) ->
205172

206173
async for doc in cursor:
207174
if key := doc.get("key"):
208-
managed_entries_by_key[key] = document_to_managed_entry(document=doc)
175+
managed_entries_by_key[key] = self._adapter.from_storage(data=doc)
209176

210177
return [managed_entries_by_key[key] for key in keys]
211178

@@ -217,7 +184,11 @@ async def _put_managed_entry(
217184
collection: str,
218185
managed_entry: ManagedEntry,
219186
) -> None:
220-
mongo_doc: dict[str, Any] = managed_entry_to_document(key=key, managed_entry=managed_entry)
187+
mongo_doc = self._adapter.to_storage(key=key, entry=managed_entry)
188+
189+
if not isinstance(mongo_doc, dict):
190+
msg = "MongoDB adapter must return dict"
191+
raise TypeError(msg)
221192

222193
sanitized_collection = self._sanitize_collection_name(collection=collection)
223194

@@ -248,7 +219,11 @@ async def _put_managed_entries(
248219

249220
operations: list[UpdateOne] = []
250221
for key, managed_entry in zip(keys, managed_entries, strict=True):
251-
mongo_doc: dict[str, Any] = managed_entry_to_document(key=key, managed_entry=managed_entry)
222+
mongo_doc = self._adapter.to_storage(key=key, entry=managed_entry)
223+
224+
if not isinstance(mongo_doc, dict):
225+
msg = "MongoDB adapter must return dict"
226+
raise TypeError(msg)
252227

253228
operations.append(
254229
UpdateOne(

key-value/key-value-aio/src/key_value/aio/stores/redis/store.py

Lines changed: 22 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from key_value.shared.type_checking.bear_spray import bear_spray
77
from key_value.shared.utils.compound import compound_key, get_keys_from_compound_keys
88
from key_value.shared.utils.managed_entry import ManagedEntry
9+
from key_value.shared.utils.serialization import FullJsonAdapter, SerializationAdapter
910
from typing_extensions import override
1011

1112
from key_value.aio.stores.base import BaseContextManagerStore, BaseDestroyStore, BaseEnumerateKeysStore, BaseStore
@@ -20,41 +21,11 @@
2021
PAGE_LIMIT = 10000
2122

2223

23-
def managed_entry_to_json(managed_entry: ManagedEntry) -> str:
24-
"""Convert a ManagedEntry to a JSON string for Redis storage.
25-
26-
This function serializes a ManagedEntry to JSON format including all metadata (TTL, creation,
27-
and expiration timestamps). The serialization is designed to preserve all entry information
28-
for round-trip conversion back to a ManagedEntry.
29-
30-
Args:
31-
managed_entry: The ManagedEntry to serialize.
32-
33-
Returns:
34-
A JSON string representation of the ManagedEntry with full metadata.
35-
"""
36-
return managed_entry.to_json(include_metadata=True, include_expiration=True, include_creation=True)
37-
38-
39-
def json_to_managed_entry(json_str: str) -> ManagedEntry:
40-
"""Convert a JSON string from Redis storage back to a ManagedEntry.
41-
42-
This function deserializes a JSON string (created by `managed_entry_to_json`) back to a
43-
ManagedEntry object, preserving all metadata including TTL, creation, and expiration timestamps.
44-
45-
Args:
46-
json_str: The JSON string to deserialize.
47-
48-
Returns:
49-
A ManagedEntry object reconstructed from the JSON string.
50-
"""
51-
return ManagedEntry.from_json(json_str=json_str, includes_metadata=True)
52-
53-
5424
class RedisStore(BaseDestroyStore, BaseEnumerateKeysStore, BaseContextManagerStore, BaseStore):
5525
"""Redis-based key-value store."""
5626

5727
_client: Redis
28+
_adapter: SerializationAdapter
5829

5930
@overload
6031
def __init__(self, *, client: Redis, default_collection: str | None = None) -> None: ...
@@ -111,6 +82,7 @@ def __init__(
11182
)
11283

11384
self._stable_api = True
85+
self._adapter = FullJsonAdapter()
11486

11587
super().__init__(default_collection=default_collection)
11688

@@ -123,7 +95,7 @@ async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry
12395
if not isinstance(redis_response, str):
12496
return None
12597

126-
managed_entry: ManagedEntry = json_to_managed_entry(json_str=redis_response)
98+
managed_entry: ManagedEntry = self._adapter.from_storage(data=redis_response)
12799

128100
return managed_entry
129101

@@ -139,7 +111,7 @@ async def _get_managed_entries(self, *, collection: str, keys: Sequence[str]) ->
139111
entries: list[ManagedEntry | None] = []
140112
for redis_response in redis_responses:
141113
if isinstance(redis_response, str):
142-
entries.append(json_to_managed_entry(json_str=redis_response))
114+
entries.append(self._adapter.from_storage(data=redis_response))
143115
else:
144116
entries.append(None)
145117

@@ -155,7 +127,11 @@ async def _put_managed_entry(
155127
) -> None:
156128
combo_key: str = compound_key(collection=collection, key=key)
157129

158-
json_value: str = managed_entry_to_json(managed_entry=managed_entry)
130+
json_value = self._adapter.to_storage(key=key, entry=managed_entry)
131+
132+
if not isinstance(json_value, str):
133+
msg = "Redis adapter must return str"
134+
raise TypeError(msg)
159135

160136
if managed_entry.ttl is not None:
161137
# Redis does not support <= 0 TTLs
@@ -181,10 +157,13 @@ async def _put_managed_entries(
181157

182158
if ttl is None:
183159
# If there is no TTL, we can just do a simple mset
184-
mapping: dict[str, str] = {
185-
compound_key(collection=collection, key=key): managed_entry_to_json(managed_entry=managed_entry)
186-
for key, managed_entry in zip(keys, managed_entries, strict=True)
187-
}
160+
mapping: dict[str, str] = {}
161+
for key, managed_entry in zip(keys, managed_entries, strict=True):
162+
json_value = self._adapter.to_storage(key=key, entry=managed_entry)
163+
if not isinstance(json_value, str):
164+
msg = "Redis adapter must return str"
165+
raise TypeError(msg)
166+
mapping[compound_key(collection=collection, key=key)] = json_value
188167

189168
await self._client.mset(mapping=mapping)
190169

@@ -198,7 +177,11 @@ async def _put_managed_entries(
198177

199178
for key, managed_entry in zip(keys, managed_entries, strict=True):
200179
combo_key: str = compound_key(collection=collection, key=key)
201-
json_value: str = managed_entry_to_json(managed_entry=managed_entry)
180+
json_value = self._adapter.to_storage(key=key, entry=managed_entry)
181+
182+
if not isinstance(json_value, str):
183+
msg = "Redis adapter must return str"
184+
raise TypeError(msg)
202185

203186
pipeline.setex(name=combo_key, time=ttl_seconds, value=json_value)
204187

key-value/key-value-aio/src/key_value/aio/wrappers/compression/wrapper.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from collections.abc import Mapping, Sequence
55
from typing import Any, SupportsFloat
66

7-
from key_value.shared.utils.managed_entry import ManagedEntry
7+
from key_value.shared.utils.managed_entry import estimate_serialized_size
88
from typing_extensions import override
99

1010
from key_value.aio.protocols.key_value import AsyncKeyValue
@@ -56,7 +56,7 @@ def _should_compress(self, value: dict[str, Any]) -> bool:
5656
return False
5757

5858
# Check size
59-
item_size: int = len(ManagedEntry(value=value).to_json())
59+
item_size: int = estimate_serialized_size(value=value)
6060
return item_size >= self.min_size_to_compress
6161

6262
def _compress_value(self, value: dict[str, Any]) -> dict[str, Any]:

0 commit comments

Comments
 (0)