-
Notifications
You must be signed in to change notification settings - Fork 2
Add four new resilience and versioning wrappers #191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This commit implements four new protocol-compliant wrappers based on industry patterns from resilience frameworks and caching libraries: 1. CircuitBreakerWrapper - Prevents cascading failures by blocking requests to failing backends - Implements the circuit breaker pattern with CLOSED/OPEN/HALF_OPEN states - Configurable failure threshold, recovery timeout, and success threshold - Essential for production resilience 2. RateLimitWrapper - Protects backends from overload with request throttling - Supports both sliding and fixed window strategies - Configurable request limits and time windows - Uses asyncio primitives for concurrent request tracking 3. VersioningWrapper - Enables schema evolution with automatic version tagging - Auto-invalidates cache entries with mismatched versions - Stores version metadata within value dict (similar to CompressionWrapper) - Useful for deployment coordination and schema migration 4. BulkheadWrapper - Isolates operations with bounded resource pools - Limits concurrent operations using asyncio.Semaphore - Configurable queue size to prevent unbounded growth - Prevents resource exhaustion and enables graceful degradation All wrappers: - Follow existing patterns (RetryWrapper, TimeoutWrapper, CompressionWrapper) - Include comprehensive test coverage - Use proper TypeVar and Callable typing for type safety - Have detailed docstrings with usage examples 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: William Easton <[email protected]>
📝 WalkthroughWalkthroughAdds four async wrappers (Bulkhead, CircuitBreaker, RateLimit, Versioning) with package-level exports, associated wrapper errors, a sync-generated Versioning wrapper, comprehensive async/sync tests, and updates to the sync build script exclusion lists. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant BulkheadWrapper
participant Backend
rect rgb(220,235,255)
Note over BulkheadWrapper: Bulkhead pattern (concurrency + waiting queue)
Client->>BulkheadWrapper: operation(...)
activate BulkheadWrapper
BulkheadWrapper->>BulkheadWrapper: check waiting_count & semaphore
alt slot acquired
BulkheadWrapper->>Backend: delegate operation
Backend-->>BulkheadWrapper: result / error
BulkheadWrapper->>BulkheadWrapper: release semaphore / adjust waiting_count
BulkheadWrapper-->>Client: return / raise
else queue full
BulkheadWrapper-->>Client: raise BulkheadFullError
end
deactivate BulkheadWrapper
end
sequenceDiagram
participant Client
participant CircuitBreaker
participant Backend
rect rgb(255,235,235)
Note over CircuitBreaker: Circuit breaker flow
Client->>CircuitBreaker: operation(...)
activate CircuitBreaker
CircuitBreaker->>CircuitBreaker: _check_circuit()
alt state == OPEN and timeout not elapsed
CircuitBreaker-->>Client: raise CircuitOpenError
else
CircuitBreaker->>Backend: attempt operation
Backend-->>CircuitBreaker: success / exception
alt success
CircuitBreaker->>CircuitBreaker: _on_success()
CircuitBreaker-->>Client: return result
else failure
CircuitBreaker->>CircuitBreaker: _on_failure()
CircuitBreaker-->>Client: propagate error
end
end
deactivate CircuitBreaker
end
sequenceDiagram
participant Client
participant VersioningWrapper
participant Backend
rect rgb(250,250,220)
Note over VersioningWrapper: Versioning put/get
Client->>VersioningWrapper: put(key, value)
activate VersioningWrapper
VersioningWrapper->>VersioningWrapper: _wrap_value(value)
VersioningWrapper->>Backend: put(key, wrapped)
VersioningWrapper-->>Client: OK
deactivate VersioningWrapper
Client->>VersioningWrapper: get(key)
activate VersioningWrapper
VersioningWrapper->>Backend: get(key)
Backend-->>VersioningWrapper: stored_value
VersioningWrapper->>VersioningWrapper: _unwrap_value(stored_value)
alt versions match or unversioned
VersioningWrapper-->>Client: value
else mismatch
VersioningWrapper-->>Client: None
end
deactivate VersioningWrapper
end
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (15)
key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/__init__.py(1 hunks)key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/wrapper.py(1 hunks)key-value/key-value-aio/src/key_value/aio/wrappers/circuit_breaker/__init__.py(1 hunks)key-value/key-value-aio/src/key_value/aio/wrappers/circuit_breaker/wrapper.py(1 hunks)key-value/key-value-aio/src/key_value/aio/wrappers/rate_limit/__init__.py(1 hunks)key-value/key-value-aio/src/key_value/aio/wrappers/rate_limit/wrapper.py(1 hunks)key-value/key-value-aio/src/key_value/aio/wrappers/versioning/__init__.py(1 hunks)key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py(1 hunks)key-value/key-value-aio/tests/stores/wrappers/test_bulkhead.py(1 hunks)key-value/key-value-aio/tests/stores/wrappers/test_circuit_breaker.py(1 hunks)key-value/key-value-aio/tests/stores/wrappers/test_rate_limit.py(1 hunks)key-value/key-value-aio/tests/stores/wrappers/test_versioning.py(1 hunks)key-value/key-value-shared/src/key_value/shared/errors/wrappers/bulkhead.py(1 hunks)key-value/key-value-shared/src/key_value/shared/errors/wrappers/circuit_breaker.py(1 hunks)key-value/key-value-shared/src/key_value/shared/errors/wrappers/rate_limit.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (15)
key-value/key-value-aio/src/key_value/aio/wrappers/versioning/__init__.py (1)
key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py (1)
VersioningWrapper(14-124)
key-value/key-value-aio/src/key_value/aio/wrappers/rate_limit/wrapper.py (3)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/rate_limit.py (1)
RateLimitExceededError(4-11)key-value/key-value-aio/src/key_value/aio/protocols/key_value.py (1)
AsyncKeyValue(185-190)key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/wrapper.py (8)
get(93-94)get_many(97-98)ttl(101-102)ttl_many(105-106)put(109-110)put_many(113-121)delete(124-125)delete_many(128-129)
key-value/key-value-aio/tests/stores/wrappers/test_versioning.py (1)
key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py (7)
VersioningWrapper(14-124)put(110-112)get(88-90)get_many(93-95)ttl(98-102)ttl_many(105-107)put_many(115-124)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/circuit_breaker.py (1)
key-value/key-value-shared/src/key_value/shared/errors/key_value.py (1)
KeyValueOperationError(6-7)
key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py (1)
key-value/key-value-aio/src/key_value/aio/protocols/key_value.py (1)
AsyncKeyValue(185-190)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/bulkhead.py (1)
key-value/key-value-shared/src/key_value/shared/errors/key_value.py (1)
KeyValueOperationError(6-7)
key-value/key-value-aio/tests/stores/wrappers/test_bulkhead.py (2)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/bulkhead.py (1)
BulkheadFullError(4-11)key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/wrapper.py (4)
BulkheadWrapper(14-129)get(93-94)put(109-110)delete(124-125)
key-value/key-value-aio/src/key_value/aio/wrappers/circuit_breaker/__init__.py (1)
key-value/key-value-aio/src/key_value/aio/wrappers/circuit_breaker/wrapper.py (1)
CircuitBreakerWrapper(23-172)
key-value/key-value-aio/src/key_value/aio/wrappers/circuit_breaker/wrapper.py (3)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/circuit_breaker.py (1)
CircuitOpenError(4-11)key-value/key-value-aio/src/key_value/aio/protocols/key_value.py (1)
AsyncKeyValue(185-190)key-value/key-value-aio/tests/stores/wrappers/test_circuit_breaker.py (2)
get(19-24)get(167-170)
key-value/key-value-aio/src/key_value/aio/wrappers/rate_limit/__init__.py (1)
key-value/key-value-aio/src/key_value/aio/wrappers/rate_limit/wrapper.py (1)
RateLimitWrapper(14-162)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/rate_limit.py (1)
key-value/key-value-shared/src/key_value/shared/errors/key_value.py (1)
KeyValueOperationError(6-7)
key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/wrapper.py (3)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/bulkhead.py (1)
BulkheadFullError(4-11)key-value/key-value-aio/src/key_value/aio/protocols/key_value.py (1)
AsyncKeyValue(185-190)key-value/key-value-aio/tests/stores/wrappers/test_bulkhead.py (2)
get(21-28)get(137-139)
key-value/key-value-aio/tests/stores/wrappers/test_circuit_breaker.py (2)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/circuit_breaker.py (1)
CircuitOpenError(4-11)key-value/key-value-aio/src/key_value/aio/wrappers/circuit_breaker/wrapper.py (5)
CircuitBreakerWrapper(23-172)CircuitState(15-20)get(136-137)put(152-153)delete(167-168)
key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/__init__.py (1)
key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/wrapper.py (1)
BulkheadWrapper(14-129)
key-value/key-value-aio/tests/stores/wrappers/test_rate_limit.py (2)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/rate_limit.py (1)
RateLimitExceededError(4-11)key-value/key-value-aio/src/key_value/aio/wrappers/rate_limit/wrapper.py (4)
RateLimitWrapper(14-162)put(138-140)get(118-120)delete(155-157)
🔇 Additional comments (18)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/rate_limit.py (1)
4-11: Good addition of contextual error details.The error surfaces precise quota data through
extra_info, matching the shared error contract and aiding callers in backoff logic.key-value/key-value-aio/src/key_value/aio/wrappers/rate_limit/__init__.py (1)
1-3: Public export matches wrapper pattern.Re-export keeps the package API consistent with other wrappers.
key-value/key-value-aio/tests/stores/wrappers/test_rate_limit.py (1)
12-132: Solid coverage for rate limiting behavior.These scenarios validate both strategies, window resets, mixed ops, and concurrency, giving strong confidence in the wrapper.
key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py (6)
14-44: Excellent documentation and clear use cases.The docstring effectively explains the versioning pattern, provides concrete examples, and clearly demonstrates the auto-invalidation behavior. This will help users understand when and how to use this wrapper.
88-90: LGTM!Clean delegation pattern with proper unwrapping.
93-95: LGTM!Correctly unwraps each value in the bulk operation.
98-102: LGTM!Correctly nullifies TTL when version mismatch occurs, maintaining consistency.
110-112: LGTM!Correctly converts to dict and wraps before delegating to the underlying store.
115-123: LGTM!Bulk operation correctly wraps all values before delegation.
key-value/key-value-aio/src/key_value/aio/wrappers/versioning/__init__.py (1)
1-3: LGTM!Clean package initialization following standard Python patterns.
key-value/key-value-aio/tests/stores/wrappers/test_versioning.py (6)
9-13: LGTM!Fixture correctly instantiates VersioningWrapper with MemoryStore.
15-58: Excellent basic test coverage.These tests thoroughly cover the fundamental operations: wrapping/unwrapping, metadata storage, version mismatch handling, and backward compatibility with unversioned data. The assertions directly inspect the underlying store to verify correct wrapping behavior.
60-92: Good coverage of version types and bulk operations.Tests verify integer versions work correctly and that
get_manyproperly handles mixed-version scenarios, returning None for mismatched versions.
94-127: TTL operations properly tested.Both
ttlandttl_manyare tested with version mismatches, correctly verifying that TTL is also set to None when version doesn't match. This ensures consistent behavior across the API.
129-156: Comprehensive edge case coverage.The
put_manytest verifies bulk wrapping, and the double-wrap prevention test ensures that already-versioned data isn't nested. However, note that the double-wrap prevention logic in the implementation (line 65 of wrapper.py) could allow user data with__version__key to bypass versioning—see comment on wrapper.py.
158-180: Excellent realistic scenario test.This schema evolution test demonstrates a real-world use case and validates the entire versioning flow: cache invalidation on schema change, reloading with new schema, and bidirectional version isolation. This is exactly the use case the wrapper was designed for.
key-value/key-value-aio/src/key_value/aio/wrappers/circuit_breaker/__init__.py (1)
1-3: Re-export looks solid.Public surface matches the wrapper module; no issues.
key-value/key-value-shared/src/key_value/shared/errors/wrappers/circuit_breaker.py (1)
4-11: Exception wiring aligns with shared error pattern.Constructor cleanly extends KeyValueOperationError; looks good.
key-value/key-value-aio/src/key_value/aio/wrappers/circuit_breaker/wrapper.py
Show resolved
Hide resolved
key-value/key-value-aio/src/key_value/aio/wrappers/rate_limit/wrapper.py
Outdated
Show resolved
Hide resolved
| # Special keys used to store version information | ||
| _VERSION_KEY = "__version__" | ||
| _VERSIONED_DATA_KEY = "__versioned_data__" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider namespace collision risk with dunder-style keys.
The keys __version__ and __versioned_data__ use Python's dunder (double-underscore) convention, which is typically reserved for special methods and attributes. While these are stored in user data (not Python attributes), users might legitimately use __version__ in their data structures, causing conflicts.
Consider using a more unique prefix like _kv_version_ or __kv_meta_version__ to reduce collision risk with user data.
🤖 Prompt for AI Agents
In key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py
around lines 9–11 the module-level keys _VERSION_KEY and _VERSIONED_DATA_KEY use
dunder-style names ("__version__", "__versioned_data__") which risks colliding
with legitimate user data; replace these constants with a more unique prefix
(e.g. "_kv_version_" and "_kv_versioned_data_" or
"__kv_meta_version__"/"__kv_meta_versioned_data__") and update every reference
in the codebase (imports, uses, tests, and docs) to the new constants so
storage/lookup remains consistent and collisions are avoided.
| def __init__( | ||
| self, | ||
| key_value: AsyncKeyValue, | ||
| version: str | int, | ||
| ) -> None: | ||
| """Initialize the versioning wrapper. | ||
| Args: | ||
| key_value: The store to wrap. | ||
| version: The version identifier to tag values with. Can be string (e.g., "v1.2.0") or int (e.g., 1). | ||
| """ | ||
| self.key_value: AsyncKeyValue = key_value | ||
| self.version: str | int = version | ||
|
|
||
| super().__init__() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider validating the version parameter.
The version parameter accepts any str | int without validation. Empty strings or potentially confusing values (e.g., empty string, None-like strings) could lead to unexpected behavior.
Consider adding validation:
def __init__(
self,
key_value: AsyncKeyValue,
version: str | int,
) -> None:
"""Initialize the versioning wrapper.
Args:
key_value: The store to wrap.
version: The version identifier to tag values with. Can be string (e.g., "v1.2.0") or int (e.g., 1).
"""
+ if isinstance(version, str) and not version:
+ raise ValueError("Version string cannot be empty")
self.key_value: AsyncKeyValue = key_value
self.version: str | int = version
super().__init__()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def __init__( | |
| self, | |
| key_value: AsyncKeyValue, | |
| version: str | int, | |
| ) -> None: | |
| """Initialize the versioning wrapper. | |
| Args: | |
| key_value: The store to wrap. | |
| version: The version identifier to tag values with. Can be string (e.g., "v1.2.0") or int (e.g., 1). | |
| """ | |
| self.key_value: AsyncKeyValue = key_value | |
| self.version: str | int = version | |
| super().__init__() | |
| def __init__( | |
| self, | |
| key_value: AsyncKeyValue, | |
| version: str | int, | |
| ) -> None: | |
| """Initialize the versioning wrapper. | |
| Args: | |
| key_value: The store to wrap. | |
| version: The version identifier to tag values with. Can be string (e.g., "v1.2.0") or int (e.g., 1). | |
| """ | |
| if isinstance(version, str) and not version: | |
| raise ValueError("Version string cannot be empty") | |
| self.key_value: AsyncKeyValue = key_value | |
| self.version: str | int = version | |
| super().__init__() |
🤖 Prompt for AI Agents
In key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py
around lines 46 to 61, the constructor accepts a version of type str | int
without validation which can allow empty or invalid values; add explicit
validation before assigning self.version: if version is an int ensure it's
within an acceptable range (e.g., non-negative or >0 per project rules), if
version is a str strip whitespace and reject empty strings or known "None-like"
values, otherwise raise a ValueError with a clear message; perform these checks
and only after validation assign self.version and proceed with super().__init__.
key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py
Show resolved
Hide resolved
key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py
Outdated
Show resolved
Hide resolved
key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py
Outdated
Show resolved
Hide resolved
|
@claude please review the feedback provided for potential inclusion |
|
Claude finished @strawgate's task —— View job CodeRabbit Feedback Review - Complete ✅I've analyzed CodeRabbit's review and all four wrapper implementations. Here's my assessment of which issues should be addressed: 🔴 Critical Issues - MUST FIX1. BulkheadWrapper: Broken
|
|
@claude can you please review and incorporate feedback? |
|
Claude finished @strawgate's task —— View job CodeRabbit Feedback Review - Complete ✅I've analyzed CodeRabbit's review and all four wrapper implementations. Here's my assessment of which issues should be addressed: 🔴 Critical Issues - MUST FIX1. BulkheadWrapper: Broken
|
|
@claude can you please review and incorporate feedback? We've made a lot of changes on main so please review the code to make sure it matches our new patterns. |
|
I'll analyze this and get back to you. |
Test Failure AnalysisSummary: The CI Root Cause: The PR adds four new wrapper implementations to the async library (
The CI workflow enforces this by running Suggested Solution: Run
This will add all 16 missing files and make the CI check pass. Detailed AnalysisCI Failure LogWhy These Files Are RequiredThe py-key-value project maintains two parallel libraries:
When you add new async wrappers, the codegen process creates:
All generated files must be committed alongside the async changes to maintain consistency between the two libraries. Related FilesFiles Added by This PR (Async)
Files That Need to Be Added (Sync - Auto-generated)All 16 files listed in the "Suggested Solution" section above. Relevant Project Files
Note: This is a common requirement in the py-key-value project. The CI ensures that generated code is always up-to-date and committed. After running |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (5)
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/__init__.py(1 hunks)key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py(1 hunks)key-value/key-value-sync/src/key_value/sync/wrappers/versioning/__init__.py(1 hunks)key-value/key-value-sync/tests/code_gen/stores/wrappers/test_versioning.py(1 hunks)scripts/build_sync_library.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
key-value/key-value-sync/src/key_value/sync/wrappers/versioning/__init__.py (1)
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py (1)
VersioningWrapper(17-118)
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/__init__.py (1)
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py (1)
VersioningWrapper(17-118)
key-value/key-value-sync/tests/code_gen/stores/wrappers/test_versioning.py (1)
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py (7)
VersioningWrapper(17-118)put(109-111)get(87-89)get_many(92-94)ttl(97-101)ttl_many(104-106)put_many(114-118)
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py (1)
key-value/key-value-sync/src/key_value/sync/code_gen/protocols/key_value.py (1)
KeyValue(178-183)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: test_quick (3.10, windows-2022, key-value/key-value-sync)
- GitHub Check: test_quick (3.10, windows-latest, key-value/key-value-aio)
- GitHub Check: test_quick (3.10, windows-latest, key-value/key-value-sync)
- GitHub Check: test_quick (3.10, windows-2022, key-value/key-value-aio)
- GitHub Check: test_quick (3.10, macos-latest, key-value/key-value-sync)
- GitHub Check: test_quick (3.10, macos-14, key-value/key-value-sync)
- GitHub Check: test_quick (3.10, ubuntu-latest, key-value/key-value-sync)
- GitHub Check: test_quick (3.10, ubuntu-22.04, key-value/key-value-aio)
- GitHub Check: test_quick (3.10, ubuntu-22.04, key-value/key-value-sync)
- GitHub Check: test_quick (3.10, ubuntu-latest, key-value/key-value-aio)
🔇 Additional comments (9)
scripts/build_sync_library.py (2)
53-55: Code changes at lines 53-55 are correct and consistent.Verification confirms that all three new test file exclusions (bulkhead, circuit_breaker, rate_limit) exist in the async project and are properly aligned with their corresponding directory exclusions in EXCLUDE_DIRECTORIES. The exclusions follow the established pattern and maintain full consistency across the configuration. Additionally, the timeout wrapper (line 52 test, line 63 directory) is also consistently excluded throughout.
63-65: The original review comment is based on a misunderstanding of the exclusion mechanism.Items in
EXCLUDE_DIRECTORIESare explicitly excluded FROM sync conversion (remaining async-only). The versioning wrapper is intentionally NOT in this list, meaning it IS being synced to the sync library—which is correct and confirmed by the auto-generated sync files that already exist.The three excluded wrappers (bulkhead, circuit_breaker, rate_limit) remain async-only as intended, while the versioning wrapper is properly synced alongside the other async wrappers.
Likely an incorrect or invalid review comment.
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py (4)
49-60: LGTM!The initialization correctly stores the wrapped key-value store and version identifier, following the standard wrapper pattern.
86-94: LGTM!Both
getandget_manycorrectly delegate to the wrapped store and unwrap the results.
96-101: LGTM!The
ttlmethod correctly handles version mismatches by returningNonefor both value and TTL when versions don't match.
108-118: LGTM!Both
putandput_manycorrectly wrap values with version metadata before storing. The conversion fromMappingtodictensures proper wrapping.key-value/key-value-sync/src/key_value/sync/wrappers/versioning/__init__.py (1)
1-6: Re-export wiring looks consistentMatches the existing sync wrapper pattern and keeps the public surface aligned with the generated implementation.
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/__init__.py (1)
1-6: Codegen export matches implementationAuto-generated module cleanly re-exports the concrete wrapper, keeping the code-gen surface coherent.
key-value/key-value-sync/tests/code_gen/stores/wrappers/test_versioning.py (1)
12-183: Comprehensive coverage confirmedThe suite exercises version tagging, mismatches, TTL propagation, bulk ops, and schema evolution scenarios, giving strong confidence in the wrapper’s behavior.
| Example: | ||
| # Version 1 of your application | ||
| store_v1 = VersioningWrapper(key_value=store, version="v1") | ||
| await store_v1.put(key="user:123", value={"name": "John", "email": "[email protected]"}) | ||
| # Version 2 changes the schema (adds "age" field) | ||
| store_v2 = VersioningWrapper(key_value=store, version="v2") | ||
| result = await store_v2.get(key="user:123") | ||
| # Returns None because version mismatch, forcing reload with new schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect async syntax in synchronous code documentation.
The example code in the docstring uses await keywords, which is invalid for synchronous code. Since this is an auto-generated file, the build script should be updated to remove or convert async syntax from docstrings when generating sync versions.
The examples should be:
# Version 1 of your application
store_v1 = VersioningWrapper(key_value=store, version="v1")
store_v1.put(key="user:123", value={"name": "John", "email": "[email protected]"})
# Version 2 changes the schema (adds "age" field)
store_v2 = VersioningWrapper(key_value=store, version="v2")
result = store_v2.get(key="user:123")
# Returns None because version mismatch, forcing reload with new schema🤖 Prompt for AI Agents
In
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py
around lines 38 to 46 the generated docstring examples still contain async
syntax (await) which is invalid for the synchronous build; update the
codegen/template that emits sync wrappers to remove or convert async syntax in
docstrings—specifically change generator logic to emit non-async call examples
(drop "await" and use direct .put/.get calls) or conditionally switch to a sync
example template when producing sync artifacts, and regenerate the file so the
docstring examples show synchronous usage without await.
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py
Show resolved
Hide resolved
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py
Show resolved
Hide resolved
| return None | ||
|
|
||
| # Extract the actual data | ||
| return value.get(_VERSIONED_DATA_KEY, value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defensive fallback could return unexpected structure.
If _VERSIONED_DATA_KEY is missing from a versioned value, the fallback returns the entire value dict including the __version__ key. This could expose internal versioning metadata to callers if the stored data is malformed.
Consider whether this should raise an error instead, or at least log a warning about malformed versioned data.
🤖 Prompt for AI Agents
In
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py
around line 84, the current fallback returns the whole dict when
_VERSIONED_DATA_KEY is missing, which can leak internal __version__ metadata;
change the logic to detect a missing or malformed versioned payload and raise a
clear exception (e.g., ValueError or a custom error) instead of returning the
raw dict, and add a short log.warn or log.error before raising so callers can
see the malformed entry; update or add a unit test that verifies the exception
is raised when _VERSIONED_DATA_KEY is absent.
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py
Outdated
Show resolved
Hide resolved
| PATHS_TO_LINT = [SYNC_PROJECT_MODULE_DIR, SYNC_PROJECT_TESTS_DIR] | ||
| EXCLUDE_FILES = [ | ||
| "key-value/key-value-aio/src/key_value/aio/__init__.py", | ||
| "key-value/key-value-aio/tests/stores/wrappers/test_timeout.py", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistency: timeout test is excluded but timeout wrapper directory is not.
The test_timeout.py file remains in EXCLUDE_FILES, but according to the changes, the timeout wrapper directory has been removed from EXCLUDE_DIRECTORIES (line 62). This creates an inconsistency where the timeout wrapper code will be converted to sync, but its tests will not be.
Either:
- Remove
test_timeout.pyfromEXCLUDE_FILESif timeout wrapper should be converted to sync (including tests), or - Re-add the timeout directory to
EXCLUDE_DIRECTORIESif it should remain async-only
Apply this diff if option 1 is correct (convert timeout tests to sync):
"key-value/key-value-aio/src/key_value/aio/__init__.py",
- "key-value/key-value-aio/tests/stores/wrappers/test_timeout.py",
"key-value/key-value-aio/tests/stores/wrappers/test_bulkhead.py",Or apply this diff if option 2 is correct (keep timeout async-only):
"key-value/key-value-aio/src/key_value/aio/stores/memcached",
"key-value/key-value-aio/tests/stores/memcached",
+ "key-value/key-value-aio/src/key_value/aio/wrappers/timeout",
"key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead",Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In scripts/build_sync_library.py around line 52, EXCLUDE_FILES still lists
"key-value/key-value-aio/tests/stores/wrappers/test_timeout.py" while the
timeout wrapper directory was removed from EXCLUDE_DIRECTORIES, causing tests to
be excluded but code converted; either remove that test file entry from
EXCLUDE_FILES to include the timeout tests in the sync conversion, or re-add the
timeout wrapper directory back into EXCLUDE_DIRECTORIES to keep the wrapper
async-only—make the change consistent by updating the appropriate list
accordingly and run the sync build to verify no mismatched exclusions remain.
Test Failure AnalysisSummary: Five test cases are failing in the new wrapper implementations due to race conditions and off-by-one errors in concurrency control logic. Root Cause: The failures stem from two distinct issues in the newly added resilience wrappers:
Suggested Solution: Fix 1: BulkheadWrapper (key-value-aio/src/key_value/aio/wrappers/bulkhead/wrapper.py)The Lines 66-90 - Replace the entire async def _execute_with_bulkhead(self, operation: Callable[..., Coroutine[Any, Any, T]], *args: Any, **kwargs: Any) -> T:
"""Execute an operation with bulkhead resource limiting."""
# Check if we can accept this operation
async with self._waiting_lock:
if self._waiting_count >= self.max_waiting:
raise BulkheadFullError(max_concurrent=self.max_concurrent, max_waiting=self.max_waiting)
self._waiting_count += 1
try:
# Acquire semaphore to limit concurrency
async with self._semaphore:
# Once we have the semaphore, we're no longer waiting
async with self._waiting_lock:
self._waiting_count -= 1
# Execute the operation
return await operation(*args, **kwargs)
except BaseException:
# Make sure to decrement waiting count if we fail before acquiring semaphore
# If we already acquired the semaphore, the count was already decremented above
async with self._waiting_lock:
# Check if decrement already happened (count should be 0 or positive)
# If we never got the semaphore, we need to decrement
if self._waiting_count > 0:
# Only decrement if we haven't already decremented
# This is tricky - we can check if we're past the semaphore acquisition
pass # Actually, the real fix is to track this differently
raiseActually, the proper fix is simpler - use a try/finally block: async def _execute_with_bulkhead(self, operation: Callable[..., Coroutine[Any, Any, T]], *args: Any, **kwargs: Any) -> T:
"""Execute an operation with bulkhead resource limiting."""
# Try to acquire a slot in the waiting queue
async with self._waiting_lock:
if self._waiting_count >= self.max_waiting:
raise BulkheadFullError(max_concurrent=self.max_concurrent, max_waiting=self.max_waiting)
self._waiting_count += 1
acquired_semaphore = False
try:
# Acquire semaphore to limit concurrency
await self._semaphore.acquire()
acquired_semaphore = True
# Once we have the semaphore, we're no longer waiting
async with self._waiting_lock:
self._waiting_count -= 1
# Execute the operation
return await operation(*args, **kwargs)
finally:
# Always clean up properly
if acquired_semaphore:
self._semaphore.release()
else:
# We never got the semaphore, so decrement waiting count
async with self._waiting_lock:
self._waiting_count -= 1Fix 2: RateLimitWrapper (key-value-aio/src/key_value/aio/wrappers/rate_limit/wrapper.py)Line 83 - Change the comparison from # Before:
if len(self._request_times) >= self.max_requests:
# After:
if len(self._request_times) > self.max_requests:Wait, that's wrong. Let me reconsider... Actually, the issue is that the check happens BEFORE recording the request. The logic is:
This means we can have at most # Check if we're at the limit (after adding this request)
if len(self._request_times) >= self.max_requests:
raise RateLimitExceededError(...)
# Record this request
self._request_times.append(now)Should become: # Record this request first
self._request_times.append(now)
# Check if we exceeded the limit (after adding)
if len(self._request_times) > self.max_requests:
# Remove the request we just added since it exceeded the limit
self._request_times.pop()
raise RateLimitExceededError(...)After Making Changes
Detailed AnalysisFailed Tests
BulkheadWrapper Issue ExplanationThe bulkhead pattern should allow:
The test
However, the current implementation has a race condition where:
For RateLimitWrapper Issue ExplanationThe rate limiter uses a sliding window of request timestamps. The current logic: # Check if we're at the limit
if len(self._request_times) >= self.max_requests: # Line 83
raise RateLimitExceededError(...)
# Record this request
self._request_times.append(now) # Line 89This rejects the request when we have Example with
The fix is to either:
Option 2 is cleaner and more intuitive. Related FilesFiles Requiring FixesAsync Library:
Sync Library (will be auto-generated):
Test Files (for reference)
Note: This analysis is from a fresh workflow run (#19213534445) that occurred after the codegen files were added. The previous codegen issue has been resolved - these are legitimate implementation bugs in the wrapper logic discovered by the test suite. |
|
@claude please update the PR with the required changes |
|
Claude finished @strawgate's task —— View job Implementation CompleteI've successfully implemented the CodeRabbit feedback and pushed the changes. Here's what was accomplished: ✅ Completed & Committed
|
Test Failure AnalysisSummary: 5 tests are failing in the new wrapper implementations (BulkheadWrapper and RateLimitWrapper). The failures indicate that these wrappers are incorrectly applying rate limiting and concurrency controls to the Root Cause: The new RateLimitWrapper and BulkheadWrapper are applying their protective mechanisms during standard test execution, which conflicts with tests that expect to perform many operations rapidly or concurrently. Specifically:
Suggested Solution: For RateLimitWrapper Test FailuresThe
# In test_rate_limit.py
@pytest.mark.skip(reason="Rate limiting intentionally bounds requests")
async def test_not_unbounded(self, store: BaseStore):
pass
@pytest.mark.skip(reason="Rate limiting affects concurrent operations")
async def test_concurrent_operations(self, store: BaseStore):
pass
@pytest.fixture
def store(self) -> RateLimitWrapper:
return RateLimitWrapper(
MemoryStore(),
max_requests=10000, # High enough for test_not_unbounded
window_seconds=60.0
)For For BulkheadWrapper Test Failures
Detailed AnalysisFailed Tests Summary (Ubuntu 22.04 run):Error Context:From for i in range(1000):
value = hashlib.sha256(f"test_{i}".encode()).hexdigest()
await store.put(collection="test_collection", key=f"test_key_{i}", value={"test": value})This test expects stores to handle 1000 sequential writes, but RateLimitWrapper with default config (100 requests/60s) blocks after 100 requests. Related Files
|
- Use time.monotonic() in CircuitBreakerWrapper and RateLimitWrapper to prevent issues with system clock adjustments - Fix VersioningWrapper double-wrap bypass by checking both version and data keys - Fix VersioningWrapper malformed data handling to return None instead of leaking metadata - Fix VersioningWrapper ttl_many to unwrap values only once for better performance - Fix circuit breaker test store to fail on all operations (get/put/delete) - Improve BulkheadWrapper waiting count tracking (WIP - tests still failing) - Fix RateLimitWrapper off-by-one error (WIP - tests still failing) Co-authored-by: William Easton <[email protected]>
|
Test Failure AnalysisSummary: The static analysis job failed with 6 type errors in Root Cause: The
This incompatibility prevents the test class from being properly recognized as implementing the Suggested Solution: Update the # Change from:
async def put(self, key: str, value: dict[str, Any], *, collection: str | None = None, ttl: float | None = None):
# To:
async def put(self, key: str, value: Mapping[str, Any], *, collection: str | None = None, ttl: SupportsFloat | None = None):You'll also need to add the import at the top of the file: from typing import Any, SupportsFloat # Add SupportsFloat
from collections.abc import Mapping # Add this import if not presentThis change ensures proper protocol compliance and resolves all 6 type errors. Detailed AnalysisError Locations:
Relevant Log Excerpt: Why This Fix Works:
Related Files
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (4)
key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/wrapper.py (1)
91-96: Double-decrement bug when operation fails after acquiring semaphore.The exception handler can incorrectly decrement
_waiting_counttwice for the same operation. When multiple operations are queued, the conditionif self._waiting_count > 0doesn't distinguish whether the current operation has already decremented its count (line 87) or not. For example:
- Operation A increments waiting_count to 1, acquires semaphore, decrements to 0, then fails during execution
- Operation B increments waiting_count to 1 (now it's 1 again from B)
- Operation A's exception handler checks
_waiting_count > 0(true, because of B), and incorrectly decrements to 0- Result: A has double-decremented; B's count is lost
Use a local flag to track whether this operation has already decremented, as suggested in the previous review.
Apply this diff to fix the double-decrement bug:
async def _execute_with_bulkhead(self, operation: Callable[..., Coroutine[Any, Any, T]], *args: Any, **kwargs: Any) -> T: """Execute an operation with bulkhead resource limiting.""" - # Check if we're over capacity before even trying - # Count the number currently executing + waiting + registered_waiting = False async with self._waiting_lock: - # _semaphore._value tells us how many slots are available - # max_concurrent - _value = number currently executing currently_executing = self.max_concurrent - self._semaphore._value total_in_system = currently_executing + self._waiting_count if total_in_system >= self.max_concurrent + self.max_waiting: raise BulkheadFullError(max_concurrent=self.max_concurrent, max_waiting=self.max_waiting) - # We're allowed in - increment waiting count self._waiting_count += 1 + registered_waiting = True try: - # Acquire semaphore (may block) async with self._semaphore: - # Once we have the semaphore, we're executing (not waiting) async with self._waiting_lock: self._waiting_count -= 1 + registered_waiting = False - # Execute the operation return await operation(*args, **kwargs) except BaseException: - # Make sure to clean up waiting count if we fail before executing - async with self._waiting_lock: - if self._waiting_count > 0: - self._waiting_count -= 1 + if registered_waiting: + async with self._waiting_lock: + self._waiting_count -= 1 raisekey-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py (2)
38-46: Fix incorrect async syntax in sync code documentation.The example code uses
awaitkeywords (lines 41, 45), which is invalid for this synchronous wrapper. Users copying these examples will encounterSyntaxError.Since this is an auto-generated file, update the
build_sync_library.pyscript to stripawaitkeywords from docstring examples when generating sync versions:# Example fix for build_sync_library.py (pseudo-code) # In the docstring transformation logic: docstring = re.sub(r'\bawait\s+', '', docstring)The corrected examples should be:
# Version 1 of your application store_v1 = VersioningWrapper(key_value=store, version="v1") store_v1.put(key="user:123", value={"name": "John", "email": "[email protected]"}) # Version 2 changes the schema (adds "age" field) store_v2 = VersioningWrapper(key_value=store, version="v2") result = store_v2.get(key="user:123") # Returns None because version mismatch, forcing reload with new schema
49-59: Add version validation (same issue as async version).This constructor has the same validation gap as the async version (lines 46-60 in the async file). Since this file is auto-generated, fixing the validation in the source async file will automatically resolve this issue here.
Refer to the review comment on the async file for the suggested validation logic.
key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py (1)
46-60: Add validation for the version parameter.The
versionparameter accepts anystr | intwithout validation. Empty strings, whitespace-only strings, or potentially invalid values could lead to unexpected behavior or collisions.Apply this diff to add validation:
def __init__( self, key_value: AsyncKeyValue, version: str | int, ) -> None: """Initialize the versioning wrapper. Args: key_value: The store to wrap. version: The version identifier to tag values with. Can be string (e.g., "v1.2.0") or int (e.g., 1). """ + if isinstance(version, str): + version = version.strip() + if not version: + raise ValueError("Version string cannot be empty or whitespace-only") + elif isinstance(version, int): + if version < 0: + raise ValueError("Version integer must be non-negative") + self.key_value: AsyncKeyValue = key_value self.version: str | int = version super().__init__()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (6)
key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/wrapper.py(1 hunks)key-value/key-value-aio/src/key_value/aio/wrappers/circuit_breaker/wrapper.py(1 hunks)key-value/key-value-aio/src/key_value/aio/wrappers/rate_limit/wrapper.py(1 hunks)key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py(1 hunks)key-value/key-value-aio/tests/stores/wrappers/test_circuit_breaker.py(1 hunks)key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (6)
key-value/key-value-aio/src/key_value/aio/wrappers/rate_limit/wrapper.py (3)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/rate_limit.py (1)
RateLimitExceededError(4-11)key-value/key-value-aio/src/key_value/aio/protocols/key_value.py (1)
AsyncKeyValue(185-190)key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/wrapper.py (8)
get(99-100)get_many(103-104)ttl(107-108)ttl_many(111-112)put(115-116)put_many(119-127)delete(130-131)delete_many(134-135)
key-value/key-value-aio/tests/stores/wrappers/test_circuit_breaker.py (2)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/circuit_breaker.py (1)
CircuitOpenError(4-11)key-value/key-value-aio/src/key_value/aio/wrappers/circuit_breaker/wrapper.py (6)
CircuitBreakerWrapper(23-174)CircuitState(15-20)get(138-139)put(154-155)ttl(146-147)delete(169-170)
key-value/key-value-aio/src/key_value/aio/wrappers/circuit_breaker/wrapper.py (2)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/circuit_breaker.py (1)
CircuitOpenError(4-11)key-value/key-value-aio/src/key_value/aio/protocols/key_value.py (1)
AsyncKeyValue(185-190)
key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py (2)
key-value/key-value-aio/src/key_value/aio/protocols/key_value.py (1)
AsyncKeyValue(185-190)key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py (9)
VersioningWrapper(17-122)_wrap_value(61-67)_unwrap_value(69-87)get(90-92)get_many(95-97)ttl(100-104)ttl_many(107-110)put(113-115)put_many(118-122)
key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/wrapper.py (3)
key-value/key-value-shared/src/key_value/shared/errors/wrappers/bulkhead.py (1)
BulkheadFullError(4-11)key-value/key-value-aio/src/key_value/aio/protocols/key_value.py (1)
AsyncKeyValue(185-190)key-value/key-value-aio/tests/stores/wrappers/test_bulkhead.py (2)
get(21-28)get(137-139)
key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py (2)
key-value/key-value-sync/src/key_value/sync/code_gen/protocols/key_value.py (1)
KeyValue(178-183)key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py (9)
VersioningWrapper(14-128)_wrap_value(62-68)_unwrap_value(70-88)get(91-93)get_many(96-98)ttl(101-105)ttl_many(108-111)put(114-116)put_many(119-128)
🪛 GitHub Actions: Run Tests
key-value/key-value-aio/tests/stores/wrappers/test_circuit_breaker.py
[error] 32-32: uv run basedpyright: Method "put" overrides class "BaseStore" in an incompatible manner. Parameter 3 type mismatch: base parameter is type "Mapping[str, Any]", override parameter is type "dict[str, Any]". Keyword parameter "ttl" type mismatch: base parameter is type "SupportsFloat | None", override parameter is type "float | None". "Mapping[str, Any]" is not assignable to "dict[str, Any]". Type "SupportsFloat | None" is not assignable to type "float | None".
🪛 GitHub Check: static_analysis (key-value/key-value-aio)
key-value/key-value-aio/tests/stores/wrappers/test_circuit_breaker.py
[failure] 201-201:
Argument of type "IntermittentlyFailingStore" cannot be assigned to parameter "key_value" of type "AsyncKeyValue" in function "init"
"IntermittentlyFailingStore" is incompatible with protocol "AsyncKeyValue"
"put" is an incompatible type
Type "(key: str, value: dict[str, Any], *, collection: str | None = None, ttl: float | None = None) -> CoroutineType[Any, Any, None]" is not assignable to type "(key: str, value: Mapping[str, Any], *, collection: str | None = None, ttl: SupportsFloat | None = None) -> CoroutineType[Any, Any, None]"
Parameter 2: type "Mapping[str, Any]" is incompatible with type "dict[str, Any]"
"Mapping[str, Any]" is not assignable to "dict[str, Any]"
Keyword parameter "ttl" of type "SupportsFloat | None" is incompatible with type "float | None"
Type "SupportsFloat | None" is not assignable to type "float | None" (reportArgumentType)
[failure] 152-152:
Argument of type "IntermittentlyFailingStore" cannot be assigned to parameter "key_value" of type "AsyncKeyValue" in function "init"
"IntermittentlyFailingStore" is incompatible with protocol "AsyncKeyValue"
"put" is an incompatible type
Type "(key: str, value: dict[str, Any], *, collection: str | None = None, ttl: float | None = None) -> CoroutineType[Any, Any, None]" is not assignable to type "(key: str, value: Mapping[str, Any], *, collection: str | None = None, ttl: SupportsFloat | None = None) -> CoroutineType[Any, Any, None]"
Parameter 2: type "Mapping[str, Any]" is incompatible with type "dict[str, Any]"
"Mapping[str, Any]" is not assignable to "dict[str, Any]"
Keyword parameter "ttl" of type "SupportsFloat | None" is incompatible with type "float | None"
Type "SupportsFloat | None" is not assignable to type "float | None" (reportArgumentType)
[failure] 110-110:
Argument of type "IntermittentlyFailingStore" cannot be assigned to parameter "key_value" of type "AsyncKeyValue" in function "init"
"IntermittentlyFailingStore" is incompatible with protocol "AsyncKeyValue"
"put" is an incompatible type
Type "(key: str, value: dict[str, Any], *, collection: str | None = None, ttl: float | None = None) -> CoroutineType[Any, Any, None]" is not assignable to type "(key: str, value: Mapping[str, Any], *, collection: str | None = None, ttl: SupportsFloat | None = None) -> CoroutineType[Any, Any, None]"
Parameter 2: type "Mapping[str, Any]" is incompatible with type "dict[str, Any]"
"Mapping[str, Any]" is not assignable to "dict[str, Any]"
Keyword parameter "ttl" of type "SupportsFloat | None" is incompatible with type "float | None"
Type "SupportsFloat | None" is not assignable to type "float | None" (reportArgumentType)
[failure] 85-85:
Argument of type "IntermittentlyFailingStore" cannot be assigned to parameter "key_value" of type "AsyncKeyValue" in function "init"
"IntermittentlyFailingStore" is incompatible with protocol "AsyncKeyValue"
"put" is an incompatible type
Type "(key: str, value: dict[str, Any], *, collection: str | None = None, ttl: float | None = None) -> CoroutineType[Any, Any, None]" is not assignable to type "(key: str, value: Mapping[str, Any], *, collection: str | None = None, ttl: SupportsFloat | None = None) -> CoroutineType[Any, Any, None]"
Parameter 2: type "Mapping[str, Any]" is incompatible with type "dict[str, Any]"
"Mapping[str, Any]" is not assignable to "dict[str, Any]"
Keyword parameter "ttl" of type "SupportsFloat | None" is incompatible with type "float | None"
Type "SupportsFloat | None" is not assignable to type "float | None" (reportArgumentType)
[failure] 64-64:
Argument of type "IntermittentlyFailingStore" cannot be assigned to parameter "key_value" of type "AsyncKeyValue" in function "init"
"IntermittentlyFailingStore" is incompatible with protocol "AsyncKeyValue"
"put" is an incompatible type
Type "(key: str, value: dict[str, Any], *, collection: str | None = None, ttl: float | None = None) -> CoroutineType[Any, Any, None]" is not assignable to type "(key: str, value: Mapping[str, Any], *, collection: str | None = None, ttl: SupportsFloat | None = None) -> CoroutineType[Any, Any, None]"
Parameter 2: type "Mapping[str, Any]" is incompatible with type "dict[str, Any]"
"Mapping[str, Any]" is not assignable to "dict[str, Any]"
Keyword parameter "ttl" of type "SupportsFloat | None" is incompatible with type "float | None"
Type "SupportsFloat | None" is not assignable to type "float | None" (reportArgumentType)
[failure] 32-32:
Method "put" overrides class "BaseStore" in an incompatible manner
Parameter 3 type mismatch: base parameter is type "Mapping[str, Any]", override parameter is type "dict[str, Any]"
Keyword parameter "ttl" type mismatch: base parameter is type "SupportsFloat | None", override parameter is type "float | None"
"Mapping[str, Any]" is not assignable to "dict[str, Any]"
Type "SupportsFloat | None" is not assignable to type "float | None"
Type "SupportsFloat" is not assignable to type "float | None"
"SupportsFloat" is not assignable to "float"
"SupportsFloat" is not assignable to "None" (reportIncompatibleMethodOverride)
🔇 Additional comments (24)
key-value/key-value-aio/src/key_value/aio/wrappers/circuit_breaker/wrapper.py (2)
80-81: LGTM - Past review feedback addressed.The implementation now correctly uses
time.monotonic()for recovery timeout calculations (line 89, 112) while preserving wall-clock timestamps for diagnostics (line 80, 111). This prevents issues when the system clock is adjusted and ensures reliable circuit breaker behavior.Also applies to: 89-89, 111-112
23-174: Excellent implementation - production-ready circuit breaker.The CircuitBreakerWrapper implementation is well-structured and follows established patterns:
- Correct state machine with CLOSED → OPEN → HALF_OPEN → CLOSED transitions
- Proper error-type filtering distinguishes transient failures from other exceptions
- All AsyncKeyValue protocol methods correctly delegated through
_execute_with_circuit_breaker- Comprehensive docstring with clear usage examples
The resilience pattern is essential for preventing cascading failures in production systems.
key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/wrapper.py (3)
1-12: LGTM!The imports and TypeVar setup are appropriate for the wrapper's functionality.
14-64: LGTM!The class documentation is comprehensive, and the initialization correctly sets up the semaphore and waiting-queue tracking infrastructure.
98-135: LGTM!All protocol method implementations correctly delegate to
_execute_with_bulkheadwith proper argument forwarding. The signatures match theAsyncKeyValueprotocol, and the@overridedecorators are appropriately used.key-value/key-value-aio/src/key_value/aio/wrappers/versioning/wrapper.py (7)
14-44: Well-documented wrapper with clear use cases.The docstring provides excellent documentation with practical examples. The
awaitsyntax is correct for this async implementation.
62-68: Good improvement: proper double-wrap prevention.The check for both
_VERSION_KEYand_VERSIONED_DATA_KEYcorrectly identifies already-wrapped values and prevents user data with__version__from bypassing wrapping.
70-88: Good improvement: malformed data now handled correctly.Lines 84-87 now properly handle missing
_VERSIONED_DATA_KEYby returningNoneinstead of leaking internal metadata. This prevents corruption from being masked.
74-76: Verify the design choice for unversioned data handling.Lines 75-76 return unversioned data as-is, while lines 79-82 invalidate version-mismatched data by returning
None. This creates different behavior for:
- Unversioned data (no
__version__key) → returned as-is- Wrong version data → invalidated (None)
This may be intentional for backward compatibility (allowing migration from unversioned to versioned storage), but the asymmetry could be confusing. Consider documenting this behavior explicitly in the docstring or adding a configuration parameter (e.g.,
strict_versioning: bool) to control whether unversioned data should also be invalidated.
90-98: LGTM: Clean delegation pattern.The
getandget_manymethods correctly delegate to the underlying store and apply unwrapping logic.
100-111: Excellent: Efficient unwrapping in TTL methods.Lines 103-105 and 110-111 now correctly unwrap values once and reuse the result, eliminating the inefficiency from previous versions. The TTL is properly set to
Nonewhen the value is invalidated by version mismatch.
113-128: LGTM: Proper wrapping and delegation.Both
putandput_manycorrectly convertMappingtodict, wrap with version information, and delegate to the underlying store.key-value/key-value-sync/src/key_value/sync/code_gen/wrappers/versioning/wrapper.py (6)
1-3: Good: Clear auto-generation warning.The warning properly indicates this file is auto-generated and directs developers to modify the source file instead.
61-67: LGTM: Correctly prevents double-wrapping.The sync version correctly implements the same improved double-wrap prevention logic as the async version.
69-87: LGTM: Proper handling of malformed versioned data.Lines 84-87 correctly handle missing
_VERSIONED_DATA_KEYby returningNone. The unversioned data behavior (lines 75-76) matches the async version's design choice—see the corresponding review comment on the async file for discussion.
89-97: LGTM: Correct sync implementation.The
getandget_manymethods are properly generated as synchronous versions withoutawait.
99-110: LGTM: Efficient TTL handling.Both
ttlandttl_manyefficiently unwrap values once and properly set TTL toNonefor invalidated values, matching the async implementation.
112-122: LGTM: Proper sync implementation of put operations.Both
putandput_manyare correctly generated as synchronous versions, with proper wrapping and delegation.key-value/key-value-aio/src/key_value/aio/wrappers/rate_limit/wrapper.py (6)
1-41: LGTM - Clear documentation and proper imports.The class definition and docstring are well-structured with comprehensive examples and clear explanations of the two rate-limiting strategies.
43-71: LGTM - Proper initialization of rate limiting state.The initialization correctly sets up state for both strategies. Maintaining separate state for each strategy is a reasonable trade-off for code clarity.
73-91: Past review concern addressed - monotonic clock now used.The previous review flagged the use of
time.time()which could break on system clock adjustments. This has been correctly fixed on line 76 by switching totime.monotonic(). The sliding window logic is sound: it optimistically records the request, then rolls back if the limit is exceeded.
93-112: Past review concern addressed - monotonic clock now used.Line 96 correctly uses
time.monotonic()(addressing the previous review). The fixed window logic mirrors the sliding window approach with proper rollback semantics when limits are exceeded.
114-119: LGTM - Clean strategy dispatcher.The dispatcher correctly routes to the appropriate rate limiting strategy.
121-166: LGTM - All protocol methods correctly implemented.All eight protocol methods follow a consistent and correct pattern: check rate limit first, then delegate to the underlying store. The use of
@overridedecorators and keyword-only arguments aligns with the codebase patterns.Note: Bulk operations (
get_many,put_many, etc.) count as single requests regardless of the number of keys/values processed. This is correct behavior for request-based rate limiting and aligns with how most API rate limiters work.
| async with self._waiting_lock: | ||
| # _semaphore._value tells us how many slots are available | ||
| # max_concurrent - _value = number currently executing | ||
| currently_executing = self.max_concurrent - self._semaphore._value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid accessing private semaphore internals.
Accessing self._semaphore._value relies on private implementation details of asyncio.Semaphore that may change across Python versions. Consider using locked() or tracking concurrent operations with your own counter instead.
For example, track concurrent operations explicitly:
# In __init__:
self._executing_count: int = 0
# In _execute_with_bulkhead:
async with self._waiting_lock:
currently_executing = self._executing_count
total_in_system = currently_executing + self._waiting_count
if total_in_system >= self.max_concurrent + self.max_waiting:
raise BulkheadFullError(...)
self._waiting_count += 1
registered_waiting = True
try:
async with self._semaphore:
async with self._waiting_lock:
self._waiting_count -= 1
self._executing_count += 1
registered_waiting = False
try:
return await operation(*args, **kwargs)
finally:
async with self._waiting_lock:
self._executing_count -= 1
except BaseException:
if registered_waiting:
async with self._waiting_lock:
self._waiting_count -= 1
raise🤖 Prompt for AI Agents
In key-value/key-value-aio/src/key_value/aio/wrappers/bulkhead/wrapper.py around
line 73, the code reads self._semaphore._value which accesses private Semaphore
internals; replace this by tracking concurrent and waiting counts explicitly
under the existing _waiting_lock: add an _executing_count attribute initialized
in __init__, use _waiting_lock to read currently_executing and total_in_system
to enforce limits, increment _waiting_count when registering, then when entering
the semaphore decrement _waiting_count and increment _executing_count, and in a
finally block decrement _executing_count; also ensure any exception path that
aborts before acquiring the semaphore decrements _waiting_count under the lock
so counters remain consistent.
| async def put(self, key: str, value: dict[str, Any], *, collection: str | None = None, ttl: float | None = None): | ||
| self._check_and_maybe_fail() | ||
| return await super().put(key=key, value=value, collection=collection, ttl=ttl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix protocol signature incompatibility.
The put method signature doesn't match the AsyncKeyValue protocol, causing type-checking failures and blocking CI.
Apply this diff:
- async def put(self, key: str, value: dict[str, Any], *, collection: str | None = None, ttl: float | None = None):
+ async def put(self, key: str, value: Mapping[str, Any], *, collection: str | None = None, ttl: SupportsFloat | None = None):
self._check_and_maybe_fail()
return await super().put(key=key, value=value, collection=collection, ttl=ttl)Add the missing import at the top of the file:
from typing import Any
+from typing import SupportsFloat
+from collections.abc import Mapping📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def put(self, key: str, value: dict[str, Any], *, collection: str | None = None, ttl: float | None = None): | |
| self._check_and_maybe_fail() | |
| return await super().put(key=key, value=value, collection=collection, ttl=ttl) | |
| from typing import Any, SupportsFloat | |
| from collections.abc import Mapping | |
| # ... rest of file ... | |
| async def put(self, key: str, value: Mapping[str, Any], *, collection: str | None = None, ttl: SupportsFloat | None = None): | |
| self._check_and_maybe_fail() | |
| return await super().put(key=key, value=value, collection=collection, ttl=ttl) |
🧰 Tools
🪛 GitHub Actions: Run Tests
[error] 32-32: uv run basedpyright: Method "put" overrides class "BaseStore" in an incompatible manner. Parameter 3 type mismatch: base parameter is type "Mapping[str, Any]", override parameter is type "dict[str, Any]". Keyword parameter "ttl" type mismatch: base parameter is type "SupportsFloat | None", override parameter is type "float | None". "Mapping[str, Any]" is not assignable to "dict[str, Any]". Type "SupportsFloat | None" is not assignable to type "float | None".
🪛 GitHub Check: static_analysis (key-value/key-value-aio)
[failure] 32-32:
Method "put" overrides class "BaseStore" in an incompatible manner
Parameter 3 type mismatch: base parameter is type "Mapping[str, Any]", override parameter is type "dict[str, Any]"
Keyword parameter "ttl" type mismatch: base parameter is type "SupportsFloat | None", override parameter is type "float | None"
"Mapping[str, Any]" is not assignable to "dict[str, Any]"
Type "SupportsFloat | None" is not assignable to type "float | None"
Type "SupportsFloat" is not assignable to type "float | None"
"SupportsFloat" is not assignable to "float"
"SupportsFloat" is not assignable to "None" (reportIncompatibleMethodOverride)
🤖 Prompt for AI Agents
In key-value/key-value-aio/tests/stores/wrappers/test_circuit_breaker.py around
lines 32 to 34, the put method uses dict[str, Any] but the file is missing the
Any import and the method signature must match the AsyncKeyValue protocol; add
the missing "Any" import from typing at the top of the file and update the put
signature to use the protocol-compatible type (e.g., value: Any) so the async
def put(self, key: str, value: Any, *, collection: str | None = None, ttl: float
| None = None) matches the AsyncKeyValue protocol and CI type checks pass.





Summary
This PR adds four new protocol-compliant wrappers to enhance py-key-value with production-ready resilience patterns and schema evolution capabilities.
New Wrappers
Testing
Implementation Notes
Closes #187
🤖 Generated with Claude Code) | View job run | [View branch](https://github.com/strawgate/py-key-value/tree/claude/issue-187-20251101-1758
Summary by CodeRabbit
New Features
Tests
Chores