Skip to content

Commit 81ab77a

Browse files
committed
Fix Elasticsearch and Mongo collection handling
1 parent 02cd882 commit 81ab77a

File tree

4 files changed

+24
-34
lines changed
  • key-value

4 files changed

+24
-34
lines changed

key-value/key-value-aio/src/key_value/aio/stores/elasticsearch/store.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,9 +211,15 @@ def __init__(
211211
self._native_storage = native_storage
212212
self._is_serverless = False
213213

214+
# We have 240 characters to work with
215+
# We need to account for the index prefix and the hyphen.
216+
max_index_length = MAX_INDEX_LENGTH - (len(self._index_prefix) + 1)
217+
214218
self._serializer = ElasticsearchSerializationAdapter(native_storage=native_storage)
215219
self._collection_sanitization = HybridSanitizationStrategy(
216-
replacement_character="_", max_length=MAX_INDEX_LENGTH, allowed_characters=ALLOWED_INDEX_CHARACTERS
220+
replacement_character="_",
221+
max_length=max_index_length,
222+
allowed_characters=ALLOWED_INDEX_CHARACTERS,
217223
)
218224
self._key_sanitization = AlwaysHashStrategy()
219225

key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ async def _setup_collection(self, *, collection: str) -> None:
205205
self._collections_by_name[collection] = self._db[sanitized_collection]
206206
return
207207

208-
new_collection: AsyncCollection[dict[str, Any]] = await self._db.create_collection(name=collection)
208+
new_collection: AsyncCollection[dict[str, Any]] = await self._db.create_collection(name=sanitized_collection)
209209

210210
# Index for efficient key lookups
211211
_ = await new_collection.create_index(keys="key")
@@ -217,9 +217,7 @@ async def _setup_collection(self, *, collection: str) -> None:
217217

218218
@override
219219
async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None:
220-
sanitized_collection = self._sanitize_collection(collection=collection)
221-
222-
if doc := await self._collections_by_name[sanitized_collection].find_one(filter={"key": key}):
220+
if doc := await self._collections_by_name[collection].find_one(filter={"key": key}):
223221
try:
224222
return self._adapter.load_dict(data=doc)
225223
except DeserializationError:
@@ -232,10 +230,8 @@ async def _get_managed_entries(self, *, collection: str, keys: Sequence[str]) ->
232230
if not keys:
233231
return []
234232

235-
sanitized_collection = self._sanitize_collection(collection=collection)
236-
237233
# Use find with $in operator to get multiple documents at once
238-
cursor = self._collections_by_name[sanitized_collection].find(filter={"key": {"$in": keys}})
234+
cursor = self._collections_by_name[collection].find(filter={"key": {"$in": keys}})
239235

240236
managed_entries_by_key: dict[str, ManagedEntry | None] = dict.fromkeys(keys)
241237

@@ -258,12 +254,10 @@ async def _put_managed_entry(
258254
) -> None:
259255
mongo_doc = self._adapter.dump_dict(entry=managed_entry)
260256

261-
sanitized_collection = self._sanitize_collection(collection=collection)
262-
263257
try:
264258
# Ensure that the value is serializable to JSON
265259
_ = managed_entry.value_as_json
266-
_ = await self._collections_by_name[sanitized_collection].update_one(
260+
_ = await self._collections_by_name[collection].update_one(
267261
filter={"key": key},
268262
update={
269263
"$set": {
@@ -291,8 +285,6 @@ async def _put_managed_entries(
291285
if not keys:
292286
return
293287

294-
sanitized_collection = self._sanitize_collection(collection=collection)
295-
296288
operations: list[UpdateOne] = []
297289
for key, managed_entry in zip(keys, managed_entries, strict=True):
298290
mongo_doc = self._adapter.dump_dict(entry=managed_entry)
@@ -311,13 +303,11 @@ async def _put_managed_entries(
311303
)
312304
)
313305

314-
_ = await self._collections_by_name[sanitized_collection].bulk_write(operations) # pyright: ignore[reportUnknownMemberType]
306+
_ = await self._collections_by_name[collection].bulk_write(operations) # pyright: ignore[reportUnknownMemberType]
315307

316308
@override
317309
async def _delete_managed_entry(self, *, key: str, collection: str) -> bool:
318-
sanitized_collection = self._sanitize_collection(collection=collection)
319-
320-
result: DeleteResult = await self._collections_by_name[sanitized_collection].delete_one(filter={"key": key})
310+
result: DeleteResult = await self._collections_by_name[collection].delete_one(filter={"key": key})
321311
return bool(result.deleted_count)
322312

323313
@override

key-value/key-value-sync/src/key_value/sync/code_gen/stores/elasticsearch/store.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,13 @@ def __init__(
180180
self._native_storage = native_storage
181181
self._is_serverless = False
182182

183+
# We have 240 characters to work with
184+
# We need to account for the index prefix and the hyphen.
185+
max_index_length = MAX_INDEX_LENGTH - (len(self._index_prefix) + 1)
186+
183187
self._serializer = ElasticsearchSerializationAdapter(native_storage=native_storage)
184188
self._collection_sanitization = HybridSanitizationStrategy(
185-
replacement_character="_", max_length=MAX_INDEX_LENGTH, allowed_characters=ALLOWED_INDEX_CHARACTERS
189+
replacement_character="_", max_length=max_index_length, allowed_characters=ALLOWED_INDEX_CHARACTERS
186190
)
187191
self._key_sanitization = AlwaysHashStrategy()
188192

key-value/key-value-sync/src/key_value/sync/code_gen/stores/mongodb/store.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ def _setup_collection(self, *, collection: str) -> None:
212212
self._collections_by_name[collection] = self._db[sanitized_collection]
213213
return
214214

215-
new_collection: Collection[dict[str, Any]] = self._db.create_collection(name=collection)
215+
new_collection: Collection[dict[str, Any]] = self._db.create_collection(name=sanitized_collection)
216216

217217
# Index for efficient key lookups
218218
_ = new_collection.create_index(keys="key")
@@ -224,9 +224,7 @@ def _setup_collection(self, *, collection: str) -> None:
224224

225225
@override
226226
def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None:
227-
sanitized_collection = self._sanitize_collection(collection=collection)
228-
229-
if doc := self._collections_by_name[sanitized_collection].find_one(filter={"key": key}):
227+
if doc := self._collections_by_name[collection].find_one(filter={"key": key}):
230228
try:
231229
return self._adapter.load_dict(data=doc)
232230
except DeserializationError:
@@ -239,10 +237,8 @@ def _get_managed_entries(self, *, collection: str, keys: Sequence[str]) -> list[
239237
if not keys:
240238
return []
241239

242-
sanitized_collection = self._sanitize_collection(collection=collection)
243-
244240
# Use find with $in operator to get multiple documents at once
245-
cursor = self._collections_by_name[sanitized_collection].find(filter={"key": {"$in": keys}})
241+
cursor = self._collections_by_name[collection].find(filter={"key": {"$in": keys}})
246242

247243
managed_entries_by_key: dict[str, ManagedEntry | None] = dict.fromkeys(keys)
248244

@@ -259,12 +255,10 @@ def _get_managed_entries(self, *, collection: str, keys: Sequence[str]) -> list[
259255
def _put_managed_entry(self, *, key: str, collection: str, managed_entry: ManagedEntry) -> None:
260256
mongo_doc = self._adapter.dump_dict(entry=managed_entry)
261257

262-
sanitized_collection = self._sanitize_collection(collection=collection)
263-
264258
try:
265259
# Ensure that the value is serializable to JSON
266260
_ = managed_entry.value_as_json
267-
_ = self._collections_by_name[sanitized_collection].update_one(
261+
_ = self._collections_by_name[collection].update_one(
268262
filter={"key": key}, update={"$set": {"key": key, **mongo_doc}}, upsert=True
269263
)
270264
except InvalidDocument as e:
@@ -285,8 +279,6 @@ def _put_managed_entries(
285279
if not keys:
286280
return
287281

288-
sanitized_collection = self._sanitize_collection(collection=collection)
289-
290282
operations: list[UpdateOne] = []
291283
for key, managed_entry in zip(keys, managed_entries, strict=True):
292284
mongo_doc = self._adapter.dump_dict(entry=managed_entry)
@@ -295,13 +287,11 @@ def _put_managed_entries(
295287
UpdateOne(filter={"key": key}, update={"$set": {"collection": collection, "key": key, **mongo_doc}}, upsert=True)
296288
)
297289

298-
_ = self._collections_by_name[sanitized_collection].bulk_write(operations) # pyright: ignore[reportUnknownMemberType]
290+
_ = self._collections_by_name[collection].bulk_write(operations) # pyright: ignore[reportUnknownMemberType]
299291

300292
@override
301293
def _delete_managed_entry(self, *, key: str, collection: str) -> bool:
302-
sanitized_collection = self._sanitize_collection(collection=collection)
303-
304-
result: DeleteResult = self._collections_by_name[sanitized_collection].delete_one(filter={"key": key})
294+
result: DeleteResult = self._collections_by_name[collection].delete_one(filter={"key": key})
305295
return bool(result.deleted_count)
306296

307297
@override

0 commit comments

Comments
 (0)