Skip to content

Commit c9f5399

Browse files
feat: add RoutingWrapper and CollectionRoutingWrapper
Add routing wrapper functionality that allows routing requests to different stores based on a routing function. This enables flexible multi-backend storage strategies. Features: - RoutingWrapper: Base wrapper with custom routing function support - CollectionRoutingWrapper: Convenience wrapper for collection-based routing - Comprehensive test coverage with 11 tests covering all operations - Support for all AsyncKeyValue protocol methods (get, put, delete, ttl, and _many variants) Use cases: - Route different collections to different backends (e.g., sessions to Redis, users to DynamoDB) - Cost optimization by routing hot/cold data to different storage tiers - Compliance requirements (e.g., PII data to encrypted stores) - Development/testing with selective mock stores Closes #114 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: William Easton <[email protected]>
1 parent 1c8bf6f commit c9f5399

File tree

4 files changed

+436
-0
lines changed

4 files changed

+436
-0
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from key_value.aio.wrappers.routing.collection_routing import CollectionRoutingWrapper
2+
from key_value.aio.wrappers.routing.wrapper import RoutingFunction, RoutingWrapper
3+
4+
__all__ = ["CollectionRoutingWrapper", "RoutingFunction", "RoutingWrapper"]
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from key_value.aio.protocols.key_value import AsyncKeyValue
2+
from key_value.aio.wrappers.routing.wrapper import RoutingWrapper
3+
4+
5+
class CollectionRoutingWrapper(RoutingWrapper):
6+
"""Routes operations based on collection name using a simple map.
7+
8+
This is a convenience wrapper that provides collection-based routing using a
9+
dictionary mapping collection names to stores. This is useful for directing
10+
different data types to different backing stores.
11+
12+
Example:
13+
router = CollectionRoutingWrapper(
14+
collection_map={
15+
"sessions": redis_store,
16+
"users": dynamo_store,
17+
"cache": memory_store,
18+
},
19+
default_store=disk_store
20+
)
21+
22+
# Gets from redis_store
23+
await router.get("session_id", collection="sessions")
24+
25+
# Gets from dynamo_store
26+
await router.get("user_id", collection="users")
27+
28+
# Gets from disk_store (default)
29+
await router.get("other_key", collection="unmapped_collection")
30+
"""
31+
32+
def __init__(
33+
self,
34+
collection_map: dict[str, AsyncKeyValue],
35+
default_store: AsyncKeyValue | None = None,
36+
) -> None:
37+
"""Initialize collection-based routing.
38+
39+
Args:
40+
collection_map: Mapping from collection name to store. Each collection
41+
name is mapped to its corresponding backing store.
42+
default_store: Store to use for unmapped collections. If None and a
43+
collection is not in the map, operations will raise ValueError.
44+
"""
45+
self.collection_map = collection_map
46+
47+
def route_by_collection(collection: str | None) -> AsyncKeyValue | None:
48+
if collection is None:
49+
return default_store
50+
return self.collection_map.get(collection, default_store)
51+
52+
super().__init__(
53+
routing_function=route_by_collection,
54+
default_store=default_store,
55+
)
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
from collections.abc import Callable, Mapping, Sequence
2+
from typing import Any, SupportsFloat
3+
4+
from typing_extensions import override
5+
6+
from key_value.aio.protocols.key_value import AsyncKeyValue
7+
from key_value.aio.wrappers.base import BaseWrapper
8+
9+
RoutingFunction = Callable[[str | None], AsyncKeyValue | None]
10+
11+
12+
class RoutingWrapper(BaseWrapper):
13+
"""Routes operations to different stores based on a routing function.
14+
15+
The routing function receives the collection name and returns the appropriate store.
16+
This allows dynamic routing of requests to different backing stores based on
17+
collection name, key patterns, or any other custom logic.
18+
19+
Example:
20+
def route_by_collection(collection: str | None) -> AsyncKeyValue | None:
21+
if collection == "sessions":
22+
return redis_store
23+
elif collection == "users":
24+
return dynamo_store
25+
return None
26+
27+
router = RoutingWrapper(
28+
routing_function=route_by_collection,
29+
default_store=memory_store
30+
)
31+
"""
32+
33+
def __init__(
34+
self,
35+
routing_function: RoutingFunction,
36+
default_store: AsyncKeyValue | None = None,
37+
) -> None:
38+
"""Initialize the routing wrapper.
39+
40+
Args:
41+
routing_function: Function that takes a collection name and returns the store to use.
42+
Should return None if no specific store is found.
43+
default_store: Optional fallback store if routing_function returns None.
44+
If both routing_function returns None and default_store is None,
45+
operations will raise ValueError.
46+
"""
47+
self.routing_function = routing_function
48+
self.default_store = default_store
49+
50+
def _get_store(self, collection: str | None) -> AsyncKeyValue:
51+
"""Get the appropriate store for the given collection.
52+
53+
Args:
54+
collection: The collection name to route.
55+
56+
Returns:
57+
The AsyncKeyValue store to use for this collection.
58+
59+
Raises:
60+
ValueError: If no store is found for the collection and no default store is configured.
61+
"""
62+
store = self.routing_function(collection)
63+
if store is None and self.default_store is not None:
64+
return self.default_store
65+
if store is None:
66+
msg = f"No store found for collection: {collection}"
67+
raise ValueError(msg)
68+
return store
69+
70+
@override
71+
async def get(self, key: str, *, collection: str | None = None) -> dict[str, Any] | None:
72+
store = self._get_store(collection)
73+
return await store.get(key=key, collection=collection)
74+
75+
@override
76+
async def get_many(self, keys: list[str], *, collection: str | None = None) -> list[dict[str, Any] | None]:
77+
store = self._get_store(collection)
78+
return await store.get_many(keys=keys, collection=collection)
79+
80+
@override
81+
async def ttl(self, key: str, *, collection: str | None = None) -> tuple[dict[str, Any] | None, float | None]:
82+
store = self._get_store(collection)
83+
return await store.ttl(key=key, collection=collection)
84+
85+
@override
86+
async def ttl_many(self, keys: list[str], *, collection: str | None = None) -> list[tuple[dict[str, Any] | None, float | None]]:
87+
store = self._get_store(collection)
88+
return await store.ttl_many(keys=keys, collection=collection)
89+
90+
@override
91+
async def put(self, key: str, value: Mapping[str, Any], *, collection: str | None = None, ttl: SupportsFloat | None = None) -> None:
92+
store = self._get_store(collection)
93+
return await store.put(key=key, value=value, collection=collection, ttl=ttl)
94+
95+
@override
96+
async def put_many(
97+
self,
98+
keys: list[str],
99+
values: Sequence[Mapping[str, Any]],
100+
*,
101+
collection: str | None = None,
102+
ttl: Sequence[SupportsFloat | None] | None = None,
103+
) -> None:
104+
store = self._get_store(collection)
105+
return await store.put_many(keys=keys, values=values, collection=collection, ttl=ttl)
106+
107+
@override
108+
async def delete(self, key: str, *, collection: str | None = None) -> bool:
109+
store = self._get_store(collection)
110+
return await store.delete(key=key, collection=collection)
111+
112+
@override
113+
async def delete_many(self, keys: list[str], *, collection: str | None = None) -> int:
114+
store = self._get_store(collection)
115+
return await store.delete_many(keys=keys, collection=collection)

0 commit comments

Comments
 (0)