Skip to content

Commit 47b0ef9

Browse files
fix: Address CodeRabbit review feedback and add index validation
- Elasticsearch: - Remove unused DEFAULT_MAPPING constant - Fix culling in JSON mode by storing timestamps at top level - Add storage mode mismatch detection with helpful error messages - Add index mapping validation on collection setup - MongoDB: - Add datetime type validation when reading in native mode - Add updated_at field to JSON mode for consistency - Remove redundant collection and key fields from documents - Use $setOnInsert for immutable created_at field - Add TTL index validation for native storage mode Both stores now validate their configuration matches the existing index/collection setup and raise helpful errors when mismatches occur. Co-authored-by: William Easton <[email protected]>
1 parent 3f1a9ce commit 47b0ef9

File tree

2 files changed

+116
-33
lines changed
  • key-value/key-value-aio/src/key_value/aio/stores

2 files changed

+116
-33
lines changed

key-value/key-value-aio/src/key_value/aio/stores/elasticsearch/store.py

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,26 +40,6 @@
4040

4141
DEFAULT_INDEX_PREFIX = "kv_store"
4242

43-
DEFAULT_MAPPING = {
44-
"properties": {
45-
"created_at": {
46-
"type": "date",
47-
},
48-
"expires_at": {
49-
"type": "date",
50-
},
51-
"collection": {
52-
"type": "keyword",
53-
},
54-
"key": {
55-
"type": "keyword",
56-
},
57-
"value": {
58-
"type": "flattened",
59-
},
60-
},
61-
}
62-
6343
DEFAULT_PAGE_SIZE = 10000
6444
PAGE_LIMIT = 10000
6545

@@ -141,6 +121,8 @@ async def _setup_collection(self, *, collection: str) -> None:
141121
index_name = self._sanitize_index_name(collection=collection)
142122

143123
if await self._client.options(ignore_status=404).indices.exists(index=index_name):
124+
# Validate that existing index mapping matches the storage mode
125+
await self._validate_index_mapping(index_name=index_name, collection=collection)
144126
return
145127

146128
# Create mapping based on storage mode
@@ -167,6 +149,34 @@ async def _setup_collection(self, *, collection: str) -> None:
167149

168150
_ = await self._client.options(ignore_status=404).indices.create(index=index_name, mappings=mapping, settings={})
169151

152+
async def _validate_index_mapping(self, *, index_name: str, collection: str) -> None:
153+
"""Validate that the index mapping matches the configured storage mode."""
154+
try:
155+
mapping_response = await self._client.indices.get_mapping(index=index_name)
156+
mappings = mapping_response.get(index_name, {}).get("mappings", {})
157+
value_field_type = mappings.get("properties", {}).get("value", {}).get("type")
158+
159+
expected_type = "flattened" if self._native_storage else "keyword"
160+
161+
if value_field_type != expected_type:
162+
msg = (
163+
f"Index mapping mismatch for collection '{collection}': "
164+
f"index has 'value' field type '{value_field_type}', "
165+
f"but store is configured for '{expected_type}' (native_storage={self._native_storage}). "
166+
f"To fix this, either: 1) Use the correct storage mode when initializing the store, "
167+
f"or 2) Delete and recreate the index with the new mapping."
168+
)
169+
raise ValueError(msg)
170+
except Exception as e:
171+
# If we can't get the mapping, log a warning but don't fail
172+
# This allows the store to work even if mapping validation fails
173+
if not isinstance(e, ValueError):
174+
# Only suppress non-ValueError exceptions (e.g., connection issues)
175+
pass
176+
else:
177+
# Re-raise ValueError from our validation
178+
raise
179+
170180
def _sanitize_index_name(self, collection: str) -> str:
171181
return sanitize_string(
172182
value=self._index_prefix + "-" + collection,
@@ -198,7 +208,19 @@ async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry
198208

199209
if self._native_storage:
200210
# Native storage mode: Get value as flattened object
201-
if not (value := source.get("value")) or not isinstance(value, dict):
211+
if not (value := source.get("value")):
212+
return None
213+
214+
# Detect if data is in JSON string format
215+
if isinstance(value, str):
216+
msg = (
217+
f"Data for key '{key}' appears to be in JSON string format, but store is configured "
218+
"for native_storage mode. This indicates a storage mode mismatch. "
219+
"You may need to migrate existing data or use the correct storage mode."
220+
)
221+
raise ValueError(msg)
222+
223+
if not isinstance(value, dict):
202224
return None
203225

204226
created_at: datetime | None = try_parse_datetime_str(value=source.get("created_at"))
@@ -213,6 +235,15 @@ async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry
213235
# JSON string mode: Get value as JSON string and parse it
214236
json_value: str | None = source.get("value")
215237

238+
# Detect if data is in native object format
239+
if isinstance(json_value, dict):
240+
msg = (
241+
f"Data for key '{key}' appears to be in native object format, but store is configured "
242+
"for JSON string mode. This indicates a storage mode mismatch. "
243+
"You may need to migrate existing data or use the correct storage mode."
244+
)
245+
raise ValueError(msg)
246+
216247
if not isinstance(json_value, str):
217248
return None
218249

@@ -259,6 +290,12 @@ async def _put_managed_entry(
259290
"value": managed_entry.to_json(), # Store as JSON string
260291
}
261292

293+
# Store timestamps at top level for culling to work
294+
if managed_entry.created_at:
295+
document["created_at"] = managed_entry.created_at.isoformat()
296+
if managed_entry.expires_at:
297+
document["expires_at"] = managed_entry.expires_at.isoformat()
298+
262299
_ = await self._client.index(
263300
index=self._sanitize_index_name(collection=collection),
264301
id=self._sanitize_document_id(key=combo_key),

key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ async def _setup_collection(self, *, collection: str) -> None:
143143

144144
if matching_collections:
145145
self._collections_by_name[collection] = self._db[collection]
146+
# Validate indexes for existing collection
147+
await self._validate_collection_indexes(collection=collection)
146148
return
147149

148150
new_collection: AsyncCollection[dict[str, Any]] = await self._db.create_collection(name=collection)
@@ -155,6 +157,32 @@ async def _setup_collection(self, *, collection: str) -> None:
155157

156158
self._collections_by_name[collection] = new_collection
157159

160+
async def _validate_collection_indexes(self, *, collection: str) -> None:
161+
"""Validate that the collection indexes match the configured storage mode."""
162+
try:
163+
coll = self._collections_by_name[collection]
164+
indexes = await coll.list_indexes().to_list(length=None)
165+
166+
# Check for TTL index on expires_at
167+
has_ttl_index = any(
168+
idx.get("key", {}).get("expires_at") is not None and idx.get("expireAfterSeconds") is not None for idx in indexes
169+
)
170+
171+
if self._native_storage and not has_ttl_index:
172+
msg = (
173+
f"Collection '{collection}' is missing TTL index on 'expires_at' field, "
174+
f"but store is configured for native_storage mode. "
175+
f"To fix this, either: 1) Recreate the collection with native_storage=True, "
176+
f"or 2) Manually create the TTL index: db.{collection}.createIndex({{expires_at: 1}}, {{expireAfterSeconds: 0}})"
177+
)
178+
raise ValueError(msg)
179+
except ValueError:
180+
# Re-raise our validation errors
181+
raise
182+
except Exception:
183+
# Suppress other errors (e.g., connection issues) to allow store to work
184+
pass
185+
158186
@override
159187
async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None:
160188
collection = self._sanitize_collection_name(collection=collection)
@@ -171,10 +199,24 @@ async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry
171199
if not isinstance(value, dict):
172200
return None
173201

174-
# Parse datetime objects directly
202+
# Parse datetime objects directly and validate types
175203
created_at: datetime | None = doc.get("created_at")
176204
expires_at: datetime | None = doc.get("expires_at")
177205

206+
# Validate datetime types to detect storage mode mismatches
207+
if created_at is not None and not isinstance(created_at, datetime):
208+
msg = (
209+
f"Data for key '{key}' has invalid created_at type: expected datetime but got {type(created_at).__name__}. "
210+
f"This may indicate a storage mode mismatch."
211+
)
212+
raise TypeError(msg)
213+
if expires_at is not None and not isinstance(expires_at, datetime):
214+
msg = (
215+
f"Data for key '{key}' has invalid expires_at type: expected datetime but got {type(expires_at).__name__}. "
216+
f"This may indicate a storage mode mismatch."
217+
)
218+
raise TypeError(msg)
219+
178220
return ManagedEntry(
179221
value=value,
180222
created_at=created_at,
@@ -201,35 +243,39 @@ async def _put_managed_entry(
201243

202244
if self._native_storage:
203245
# Native storage mode: Store value as BSON document
204-
document: dict[str, Any] = {
205-
"collection": collection,
206-
"key": key,
246+
set_fields: dict[str, Any] = {
207247
"value": managed_entry.value, # Store as BSON document
208248
"updated_at": now(),
209249
}
210250

211-
# Store as datetime objects
251+
set_on_insert_fields: dict[str, Any] = {}
252+
253+
# Store as datetime objects (use $setOnInsert for immutable fields)
212254
if managed_entry.created_at:
213-
document["created_at"] = managed_entry.created_at
255+
set_on_insert_fields["created_at"] = managed_entry.created_at
214256
if managed_entry.expires_at:
215-
document["expires_at"] = managed_entry.expires_at
257+
# expires_at can change, so use $set
258+
set_fields["expires_at"] = managed_entry.expires_at
259+
260+
update_doc: dict[str, Any] = {"$set": set_fields}
261+
if set_on_insert_fields:
262+
update_doc["$setOnInsert"] = set_on_insert_fields
216263

217264
_ = await self._collections_by_name[collection].update_one(
218265
filter={"key": key},
219-
update={"$set": document},
266+
update=update_doc,
220267
upsert=True,
221268
)
222269
else:
223270
# JSON string mode: Store value as JSON string
224-
document: dict[str, Any] = {
225-
"collection": collection,
226-
"key": key,
271+
set_fields: dict[str, Any] = {
227272
"value": managed_entry.to_json(), # Store as JSON string
273+
"updated_at": now(),
228274
}
229275

230276
_ = await self._collections_by_name[collection].update_one(
231277
filter={"key": key},
232-
update={"$set": document},
278+
update={"$set": set_fields},
233279
upsert=True,
234280
)
235281

0 commit comments

Comments
 (0)