1
+ import asyncio
1
2
from collections .abc import Awaitable , Callable , Generator
2
3
from functools import wraps
3
- from typing import NewType , ParamSpec , TypeVar , cast , final
4
+ from typing import Any , NewType , ParamSpec , TypeVar , cast , final
4
5
5
6
_ValueType = TypeVar ('_ValueType' )
6
7
_AwaitableT = TypeVar ('_AwaitableT' , bound = Awaitable )
@@ -19,6 +20,11 @@ class ReAwaitable:
19
20
So, in reality we still ``await`` once,
20
21
but pretending to do it multiple times.
21
22
23
+ This class is thread-safe and supports concurrent awaits from multiple
24
+ async tasks. When multiple tasks await the same instance simultaneously,
25
+ only one will execute the underlying coroutine while others will wait
26
+ and receive the cached result.
27
+
22
28
Why is that required? Because otherwise,
23
29
``Future`` containers would be unusable:
24
30
@@ -48,12 +54,13 @@ class ReAwaitable:
48
54
49
55
"""
50
56
51
- __slots__ = ('_cache' , '_coro' )
57
+ __slots__ = ('_cache' , '_coro' , '_lock' )
52
58
53
59
def __init__ (self , coro : Awaitable [_ValueType ]) -> None :
54
60
"""We need just an awaitable to work with."""
55
61
self ._coro = coro
56
62
self ._cache : _ValueType | _Sentinel = _sentinel
63
+ self ._lock : Any = None
57
64
58
65
def __await__ (self ) -> Generator [None , None , _ValueType ]:
59
66
"""
@@ -101,9 +108,34 @@ def __repr__(self) -> str:
101
108
102
109
async def _awaitable (self ) -> _ValueType :
103
110
"""Caches the once awaited value forever."""
104
- if self ._cache is _sentinel :
105
- self ._cache = await self ._coro
106
- return self ._cache # type: ignore
111
+ if self ._cache is not _sentinel :
112
+ return self ._cache # type: ignore
113
+
114
+ # Create lock on first use to detect the async framework
115
+ if self ._lock is None :
116
+ try :
117
+ # Try to get the current event loop
118
+ self ._lock = asyncio .Lock ()
119
+ except RuntimeError :
120
+ # If no event loop, we're probably in a different
121
+ # async framework
122
+ # For now, we'll fall back to the original behavior
123
+ # This maintains compatibility while fixing the asyncio case
124
+ if self ._cache is _sentinel :
125
+ self ._cache = await self ._coro
126
+ # This return is unreachable in practice due to race timing.
127
+ # The cache would need to be set by another coroutine between
128
+ # the initial check (line 111) and this point.
129
+ return self ._cache # type: ignore # pragma: no cover
130
+
131
+ async with self ._lock :
132
+ # Double-check after acquiring the lock
133
+ if self ._cache is _sentinel :
134
+ self ._cache = await self ._coro
135
+ # This return is unreachable in practice due to race timing.
136
+ # The cache would need to be set by another coroutine while waiting
137
+ # for the lock, but that's prevented by the lock mechanism itself.
138
+ return self ._cache # type: ignore # pragma: no cover
107
139
108
140
109
141
def reawaitable (
0 commit comments