-
Notifications
You must be signed in to change notification settings - Fork 2
Implement bulk operations for stores with native batch APIs #79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Important Review skippedBot user detected. To trigger a single review, invoke the You can disable this status message by setting the 📝 WalkthroughWalkthroughThis pull request implements batch operations across seven key-value store implementations: DynamoDB, Elasticsearch, Memcached, MongoDB, Redis, RocksDB, and Valkey. Each store gains three new methods— Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client Code
participant Store as Store Implementation
participant Backend as Store Backend<br/>(DB/Cache/etc)
rect rgb(200, 220, 255)
Note over Client,Backend: Batch Get Operation
Client->>Store: _get_managed_entries([key1, key2, key3])
Store->>Backend: Bulk fetch (mget/find $in/batch_get_item)
Backend-->>Store: [result1, result2, result3]
Store->>Store: Decode/parse results<br/>maintain input order
Store-->>Client: [ManagedEntry|None, ...]
end
rect rgb(220, 255, 220)
Note over Client,Backend: Batch Put Operation
Client->>Store: _put_managed_entries([key1, key2], [entry1, entry2])
Store->>Store: Construct documents<br/>with TTL if present
Store->>Backend: Bulk write (batch_write_item/bulk/pipeline/WriteBatch)
Backend-->>Store: success
Store-->>Client: void
end
rect rgb(255, 220, 220)
Note over Client,Backend: Batch Delete Operation
Client->>Store: _delete_managed_entries([key1, key2, key3])
Store->>Backend: Bulk delete (batch_write_item/bulk/delete_many)
Backend-->>Store: count_deleted
Store-->>Client: 3 (count of deleted entries)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Multiple store implementations with heterogeneous bulk API patterns (batch_get_item, mget, find $in, WriteBatch, pipelines) require individual validation. Logic density varies by store; TTL handling, order preservation, and per-store batch limits demand careful verification across all seven files. Consistency in method signatures and return types across implementations aids review but does not reduce the need for store-specific reasoning. Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
Comment |
…ched, Elasticsearch, and RocksDB stores Co-authored-by: strawgate <[email protected]>
…ilable) Co-authored-by: strawgate <[email protected]>
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py (1)
101-116: Fix potential AttributeError when a pre-connected client is provided.When
clientis passed,self._raw_clientis never set;__aenter__still reads it, which can raiseAttributeError. Initialize it toNoneand guard withis not None.@@ ) -> None: """Initialize the DynamoDB store. @@ - self._table_name = table_name + self._table_name = table_name + self._raw_client = None # always defined; only set when creating our own client if client: self._client = client else: @@ async def __aenter__(self) -> Self: - if self._raw_client: + if self._raw_client is not None: self._client = await self._raw_client.__aenter__() await super().__aenter__() return selfAlso applies to: 118-123
🧹 Nitpick comments (7)
key-value/key-value-aio/src/key_value/aio/stores/valkey/store.py (1)
130-142: Clarify or implement the pipelining mentioned in the comment.The comment on line 137 mentions "BaseClient supports pipelining through transaction," but the implementation awaits each
setindividually, resulting in separate round-trips per key. This doesn't leverage any pipelining optimization.Consider one of the following:
- Remove the misleading comment if pipelining isn't beneficial here due to per-key TTL requirements:
- # Valkey's mset doesn't support per-key TTL, so we need to use a different approach - # We'll use a pipeline-like approach with individual set commands - # Note: BaseClient supports pipelining through transaction + # Valkey's mset doesn't support per-key TTL, so we use individual set commands for key, managed_entry in zip(keys, managed_entries, strict=True):
- Implement actual pipelining if Glide's
BaseClientsupports batching multiple commands (e.g., via transactions or pipelines), which would reduce network round-trips even with individual SET commands:# Example (syntax depends on Glide client API): async with self._client.pipeline() as pipe: for key, managed_entry in zip(keys, managed_entries, strict=True): combo_key: str = compound_key(collection=collection, key=key) json_value: str = managed_entry.to_json() expiry: ExpirySet | None = ExpirySet(expiry_type=ExpiryType.SEC, value=int(managed_entry.ttl)) if managed_entry.ttl else None pipe.set(key=combo_key, value=json_value, expiry=expiry) await pipe.execute()key-value/key-value-aio/src/key_value/aio/stores/redis/store.py (1)
134-155: Efficient bulk write implementation with Redis pipeline.The pipeline-based approach correctly batches SET/SETEX operations, reducing network overhead. The
strict=Trueargument in zip (line 144) provides good validation, and TTL handling is consistent with the single-entry method.Optional: Consider type hint consistency for the keys parameter.
For consistency with the
managed_entriesparameter which usesSequence[ManagedEntry], you could changekeys: list[str]tokeys: Sequence[str](applies to all three bulk methods). This would make the API more flexible for callers.- async def _put_managed_entries(self, *, collection: str, keys: list[str], managed_entries: Sequence[ManagedEntry]) -> None: + async def _put_managed_entries(self, *, collection: str, keys: Sequence[str], managed_entries: Sequence[ManagedEntry]) -> None:key-value/key-value-aio/src/key_value/aio/stores/rocksdb/store.py (1)
130-146: LGTM! Consider moving WriteBatch import to module level.The batch write implementation is correct and leverages RocksDB's
WriteBatchfor atomic operations. The use ofzip(..., strict=True)ensures keys and managed_entries sequences match in length, which is a good safety measure.For consistency with the
Rdictimport pattern (lines 13-16), consider moving theWriteBatchimport to the module-level try-except block:try: - from rocksdict import Options, Rdict + from rocksdict import Options, Rdict, WriteBatch except ImportError as e:Then remove the local imports at lines 138-139 and 170.
key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py (3)
157-176: Add projection and de‑duplicate keys; consider chunking large $in queries.Reduce payload and query size with projection and by de‑duplicating input keys while preserving output order.
Apply this minimal diff:
@@ - collection = self._sanitize_collection_name(collection=collection) + collection = self._sanitize_collection_name(collection=collection) @@ - # Use find with $in operator to get multiple documents at once - cursor = self._collections_by_name[collection].find(filter={"key": {"$in": keys}}) + # Use find with $in; de‑dupe keys and project only needed fields + query_keys = list(dict.fromkeys(keys)) + cursor = self._collections_by_name[collection].find( + filter={"key": {"$in": query_keys}}, + projection={"key": True, "value": True, "_id": False}, + )Optional: if keys can be very large, process in chunks (e.g., 1k–5k) to avoid hitting BSON/command size limits; merge results into docs_by_key.
205-236: Use unordered bulk writes to avoid abort‑on‑first‑error; move UpdateOne import to module scope.Unordered writes keep going if one op fails; moving the import avoids per‑call overhead.
Apply this diff in the current block:
- _ = await self._collections_by_name[collection].bulk_write(operations) + _ = await self._collections_by_name[collection].bulk_write(operations, ordered=False)And adjust imports (outside this block) so UpdateOne is available at module scope:
@@ -try: - from pymongo import AsyncMongoClient +try: + from pymongo import AsyncMongoClient, UpdateOne @@ - # Use bulk_write for efficient batch operations - from pymongo import UpdateOne + # Use bulk_write for efficient batch operationsOptional: use $setOnInsert for created_at to avoid overwriting an existing creation timestamp on upsert.
245-254: Guard against oversized $in by chunking deletes for very large key sets.Single delete_many with a huge $in can hit command size limits; chunk and sum deleted_count.
Example change:
- # Use delete_many with $in operator for efficient batch deletion - result: DeleteResult = await self._collections_by_name[collection].delete_many(filter={"key": {"$in": keys}}) - return result.deleted_count + # Chunk to avoid command size limits on very large key lists + deleted = 0 + CHUNK = 1000 + for i in range(0, len(keys), CHUNK): + chunk = keys[i : i + CHUNK] + result: DeleteResult = await self._collections_by_name[collection].delete_many(filter={"key": {"$in": chunk}}) + deleted += int(result.deleted_count) + return deletedkey-value/key-value-aio/src/key_value/aio/stores/memcached/store.py (1)
117-139: Consider using asyncio.gather for concurrent execution.The comment on line 123 mentions thread-safety concerns, but this is misleading—async operations don't involve threading. Using
asyncio.gatherwould enable concurrent execution of the individualsetoperations, improving performance over the current sequential loop:@override async def _put_managed_entries(self, *, collection: str, keys: list[str], managed_entries: Sequence[ManagedEntry]) -> None: if not keys: return async def put_single(key: str, managed_entry: ManagedEntry) -> None: combo_key: str = self.sanitize_key(compound_key(collection=collection, key=key)) exptime: int if managed_entry.ttl is None: # noqa: SIM108 exptime = 0 else: exptime = max(int(managed_entry.ttl), 1) json_value: str = managed_entry.to_json() _ = await self._client.set( key=combo_key.encode(encoding="utf-8"), value=json_value.encode(encoding="utf-8"), exptime=exptime, ) await asyncio.gather(*(put_single(key, entry) for key, entry in zip(keys, managed_entries, strict=True)))This would leverage network concurrency while maintaining the same semantics.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py(5 hunks)key-value/key-value-aio/src/key_value/aio/stores/elasticsearch/store.py(4 hunks)key-value/key-value-aio/src/key_value/aio/stores/memcached/store.py(3 hunks)key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py(3 hunks)key-value/key-value-aio/src/key_value/aio/stores/redis/store.py(3 hunks)key-value/key-value-aio/src/key_value/aio/stores/rocksdb/store.py(3 hunks)key-value/key-value-aio/src/key_value/aio/stores/valkey/store.py(3 hunks)
🧰 Additional context used
🪛 GitHub Actions: Run Tests
key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py
[error] 207-207: pyright type error: Argument of type "dict[str, dict[str, list[dict[str, dict[str, str]]]]]" cannot be assigned to parameter "RequestItems" of type "Mapping[str, KeysAndAttributesUnionTypeDef]" in function "batch_get_item".
🪛 GitHub Check: static_analysis (key-value/key-value-aio)
key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py
[failure] 207-207:
Argument of type "dict[str, dict[str, list[dict[str, dict[str, str]]]]]" cannot be assigned to parameter "RequestItems" of type "Mapping[str, KeysAndAttributesUnionTypeDef]" in function "batch_get_item"
"dict[str, dict[str, list[dict[str, dict[str, str]]]]]" is not assignable to "Mapping[str, KeysAndAttributesUnionTypeDef]"
Type parameter "_VT_co@Mapping" is covariant, but "dict[str, list[dict[str, dict[str, str]]]]" is not a subtype of "KeysAndAttributesUnionTypeDef"
Type "dict[str, list[dict[str, dict[str, str]]]]" is not assignable to type "KeysAndAttributesUnionTypeDef"
"dict[str, list[dict[str, dict[str, str]]]]" is not assignable to "KeysAndAttributesTypeDef"
"dict[str, list[dict[str, dict[str, str]]]]" is not assignable to "KeysAndAttributesOutputTypeDef" (reportArgumentType)
[failure] 278-278:
Argument type is partially unknown
Argument corresponds to parameter "RequestItems" in function "batch_write_item"
Argument type is "dict[str, list[Unknown]]" (reportUnknownArgumentType)
[failure] 276-276:
Type of "request_items" is partially unknown
Type of "request_items" is "dict[str, list[Unknown]]" (reportUnknownVariableType)
[failure] 274-274:
Type of "append" is partially unknown
Type of "append" is "(object: Unknown, /) -> None" (reportUnknownMemberType)
[failure] 323-323:
Argument of type "dict[str, list[dict[str, dict[str, dict[str, dict[str, str]]]]]]" cannot be assigned to parameter "RequestItems" of type "Mapping[str, Sequence[WriteRequestUnionTypeDef]]" in function "batch_write_item"
"dict[str, list[dict[str, dict[str, dict[str, dict[str, str]]]]]]" is not assignable to "Mapping[str, Sequence[WriteRequestUnionTypeDef]]"
Type parameter "_VT_co@Mapping" is covariant, but "list[dict[str, dict[str, dict[str, dict[str, str]]]]]" is not a subtype of "Sequence[WriteRequestUnionTypeDef]"
"list[dict[str, dict[str, dict[str, dict[str, str]]]]]" is not assignable to "Sequence[WriteRequestUnionTypeDef]"
Type parameter "_T_co@Sequence" is covariant, but "dict[str, dict[str, dict[str, dict[str, str]]]]" is not a subtype of "WriteRequestUnionTypeDef"
Type "dict[str, dict[str, dict[str, dict[str, str]]]]" is not assignable to type "WriteRequestUnionTypeDef" (reportArgumentType)
key-value/key-value-aio/src/key_value/aio/stores/elasticsearch/store.py
[failure] 299-299:
Argument type is partially unknown
Argument corresponds to parameter "operations" in function "bulk"
Argument type is "list[Unknown]" (reportUnknownArgumentType)
[failure] 297-297:
Type of "append" is partially unknown
Type of "append" is "(object: Unknown, /) -> None" (reportUnknownMemberType)
[failure] 296-296:
Type of "append" is partially unknown
Type of "append" is "(object: Unknown, /) -> None" (reportUnknownMemberType)
[failure] 333-333:
Argument type is partially unknown
Argument corresponds to parameter "operations" in function "bulk"
Argument type is "list[Unknown]" (reportUnknownArgumentType)
[failure] 331-331:
Type of "append" is partially unknown
Type of "append" is "(object: Unknown, /) -> None" (reportUnknownMemberType)
🔇 Additional comments (18)
key-value/key-value-aio/src/key_value/aio/stores/valkey/store.py (3)
1-1: LGTM!The import of
Sequenceis correctly used for the type annotation in_put_managed_entries.
95-112: LGTM!The implementation correctly uses Valkey's
mgetfor bulk reads, handles empty lists, maintains result order, and properly converts responses toManagedEntryobjects orNone.
149-157: LGTM!The implementation correctly uses Valkey's bulk
deleteoperation, handles empty lists, and returns the count of deleted entries.key-value/key-value-aio/src/key_value/aio/stores/redis/store.py (3)
1-1: LGTM: Import addition supports bulk operation typing.The
Sequenceimport is appropriately used for themanaged_entriesparameter type hint, providing flexibility for callers.
96-112: Excellent bulk read implementation using MGET.The implementation correctly leverages Redis's native MGET command to fetch multiple keys in a single network round-trip. The empty list guard, compound key construction, and response handling (string conversion to ManagedEntry or None) are all consistent with the single-entry method.
163-172: Efficient bulk delete implementation using Redis DELETE.The implementation correctly uses Redis's multi-key DELETE command, which atomically removes all specified keys and returns the count of deleted entries. The empty list guard and compound key construction are appropriate.
key-value/key-value-aio/src/key_value/aio/stores/rocksdb/store.py (2)
1-1: LGTM!The
Sequenceimport is correctly added to support the type hint formanaged_entriesin the new batch method.
162-186: LGTM! Batch deletes correctly implemented.The implementation correctly uses
WriteBatchfor atomic batch deletions and is consistent with the single-key_delete_managed_entrymethod. The optimization to skip the write when no deletions occur is good.Note on performance: The existence checks (lines 175-181) iterate individually through keys, which could be slower for large batches compared to a hypothetical multi_get operation. However, this is unavoidable given that
rocksdictdoesn't provide a multi_get API, as noted in the PR summary.key-value/key-value-aio/src/key_value/aio/stores/elasticsearch/store.py (4)
1-1: LGTM: Import addition is correct.The
Sequenceimport is necessary for the type hint in_put_managed_entriesand is correctly imported fromcollections.abc.
187-236: LGTM: Batch retrieval implementation is correct.The
_get_managed_entriesmethod properly:
- Uses Elasticsearch's
mgetAPI for efficient batch retrieval- Handles empty input lists
- Preserves input order through dictionary lookup
- Returns
Nonefor missing, malformed, or invalid documents- Follows the same data extraction and validation patterns as the single-entry method
269-300: LGTM: Batch indexing implementation is correct.The
_put_managed_entriesmethod properly:
- Uses Elasticsearch's
bulkAPI for efficient batch indexing- Handles empty input lists
- Uses
strict=Truein zip to ensure equal-length inputs- Constructs documents with the same structure and metadata as the single-entry method
- Respects the refresh policy via
_should_refresh_on_putNote: The static analysis warnings about "partially unknown" types for the operations list are false positives due to the dynamic nature of Elasticsearch's bulk API.
316-346: LGTM: Batch deletion implementation is correct.The
_delete_managed_entriesmethod properly:
- Uses Elasticsearch's
bulkAPI for efficient batch deletion- Handles empty input lists
- Correctly counts successful deletions by checking
result == "deleted"- Returns the count of successfully deleted items
- Maintains consistency with the single-entry method by not applying refresh policy
Note: The static analysis warnings are false positives similar to those in
_put_managed_entries.key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py (2)
1-1: LGTM: correct import choice.Using collections.abc.Sequence is the right runtime import in modern Python.
205-236: The project explicitly requires Python ≥3.10 — zip(strict=True) is valid and already used throughout the codebase.All
pyproject.tomlfiles specifyrequires-python = ">=3.10", which fully supportszip(strict=True)(available since Python 3.10). This pattern is already consistently used across 12+ files in the repository, including base stores, wrappers, and test utilities. No fallback is required.Likely an incorrect or invalid review comment.
key-value/key-value-aio/src/key_value/aio/stores/memcached/store.py (3)
2-2: LGTM!The
Sequenceimport is appropriate for the type hint in_put_managed_entriesand follows Python best practices.
70-89: LGTM! Efficient batch read implementation.The method properly leverages
multi_getfor efficient batch retrieval, handles empty input gracefully, and maintains consistency with the single-key_get_managed_entrymethod.
141-145: No action required—memcached store correctly omits _delete_managed_entries.aiomcache does not provide a built-in "delete multiple keys" API. The PR objectives specify implementing
_delete_managed_entriesfor stores with native batch APIs. Since aiomcache only supports individualdelete(key)operations, the memcached store's omission of_delete_managed_entriesis correct and aligns with the stated PR objectives.key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py (1)
25-27: Batch limits look correct.100 for BatchGetItem and 25 for BatchWriteItem match DynamoDB limits. No issues.
key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py
Outdated
Show resolved
Hide resolved
key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py
Outdated
Show resolved
Hide resolved
key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py
Outdated
Show resolved
Hide resolved
key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py
Outdated
Show resolved
Hide resolved
|
@copilot it feels like the base store should perhaps have some bulk batch helpers, with each store just defining what batch size it can accept and the base store handling the batching? |
Co-authored-by: strawgate <[email protected]>
Good suggestion! I've added a The DynamoDB store has been refactored to use this helper (commit 808a899), replacing the manual Other stores can now easily use this helper if they need to batch their bulk operations. |
|
@copilot let's remove any batching implementations for now so no bulk calls for dynamo but let's proceed with the rest |
Co-authored-by: strawgate <[email protected]>
Done! Removed DynamoDB bulk operations and the All other stores (Redis, Valkey, MongoDB, Memcached, Elasticsearch, RocksDB) still have their bulk operations intact and tests are passing. |
|
Claude finished @strawgate's task —— View job Completed Code Review ImprovementsI've successfully implemented the improvements suggested by the CodeRabbit review, skipping MongoDB and Elasticsearch as you requested. Summary of ChangesValkey Store (
Redis Store (
RocksDB Store (
Memcached Store (
Quality Checks
What Was Changed
These improvements enhance code quality, consistency, and performance without changing the external API or breaking existing tests. |
- Remove misleading pipelining comment in Valkey store - Move WriteBatch import to module level in RocksDB store for consistency - Use asyncio.gather for concurrent operations in Memcached store - Change keys parameter type from list[str] to Sequence[str] in all stores for consistency with base class These changes address CodeRabbit review feedback while maintaining functionality. Co-authored-by: William Easton <[email protected]>
|
@claude static tests are failing can you fix please |
|
@claude add ttl to _put_managed_entries so that we dont have to inspect a managed entry to get the ttl info |
|
Claude finished @strawgate's task —— View job Completed: Refactored
|
Instead of extracting TTL and timestamps from managed entries, these values are now passed as explicit parameters to _put_managed_entries. This avoids the need to inspect managed entries to extract metadata. Changes: - Base store: Added ttl, created_at, expires_at parameters to signature - Base store put_many: Pre-calculates timestamps once for all entries - Redis store: Uses ttl parameter instead of extracting from first entry - MongoDB store: Uses timestamp parameters instead of extracting from first entry - Elasticsearch store: Uses timestamp parameters instead of extracting from first entry - RocksDB store: Updated signature (stores full JSON, doesn't use params) Co-authored-by: William Easton <[email protected]>
…-methods-on-stores
…-methods-on-stores
|


Overview
This PR implements efficient bulk operations (
_get_managed_entries,_put_managed_entries,_delete_managed_entries) for stores that provide native batch APIs. Previously, these stores were falling back to the base implementation which simply loops over individual operations, resulting in significant performance overhead for bulk operations.Changes
Stores with Bulk API Support
The following stores now leverage their native bulk APIs:
Redis Store
MGETcommand for efficient batch retrievalSET/SETEXoperationsDELETEcommandValkey Store
MGETcommand (Redis-compatible)SEToperations (no TTL-aware bulk API available)DELETEcommandMongoDB Store
findwith$inoperator for efficient bulk retrievalbulk_writewithUpdateOneoperations for atomic batch updatesdelete_manywith$inoperatorMemcached Store
multi_getcommand for batch retrievalSEToperations (aiomcache doesn't provide multi_set)Elasticsearch Store
mgetAPI for efficient multi-document retrievalbulkAPI for batch indexing operationsbulkAPI for batch deletion with proper result countingRocksDB Store
WriteBatchfor atomic batch write operationsWriteBatchfor atomic batch delete operationsDynamoDB Store
Performance Impact
These changes significantly improve performance for bulk operations by:
Testing
All existing tests pass (1,459 passed, 143 skipped). The existing test suite already covers bulk operations through
test_put_many_get_many,test_put_many_delete_many, and related performance tests, so no new tests were required.Implementation Notes
Fixes #78
Original prompt
Fixes #78
💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.
Summary by CodeRabbit
Release Notes