Skip to content

Commit ef1678a

Browse files
Migrate work for finished CurrentThreadExecutor to previous executor (#494)
* Redirect work for finished CurrentThreadExecutor to previous executor A CurrentThreadExecutor may terminate with work still remaining in its queue, or new work may be submitted later. We previously discarded remaining work, leading to deadlocks, and raised an error on submitting late work. Instead, execute remaining work immediately, and if there’s another CurrentThreadExecutor for the same thread below us on the stack, redirect late work there to allow it to eventually run. Doing this in a thread-safe way requires replacing the queue.Queue abstraction with collections.deque and threading.ConditionVariable (the same primitives used to implement queue.Queue). Fixes #491; fixes #492. Signed-off-by: Anders Kaseorg <[email protected]> * Add regression tests for #491 and #492 Signed-off-by: Anders Kaseorg <[email protected]> * Skip asyncio.Barrier test for Python <3.11. * Make test definition Python version dependent. Avoids mypy error on asyncio.Barrier API. --------- Signed-off-by: Anders Kaseorg <[email protected]> Co-authored-by: Carlton Gibson <[email protected]>
1 parent 37870f5 commit ef1678a

File tree

3 files changed

+114
-24
lines changed

3 files changed

+114
-24
lines changed

asgiref/current_thread_executor.py

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import queue
21
import sys
32
import threading
3+
from collections import deque
44
from concurrent.futures import Executor, Future
5-
from typing import TYPE_CHECKING, Any, Callable, TypeVar, Union
5+
from typing import TYPE_CHECKING, Any, Callable, TypeVar
66

77
if sys.version_info >= (3, 10):
88
from typing import ParamSpec
@@ -53,10 +53,12 @@ class CurrentThreadExecutor(Executor):
5353
the thread they came from.
5454
"""
5555

56-
def __init__(self) -> None:
56+
def __init__(self, old_executor: "CurrentThreadExecutor | None") -> None:
5757
self._work_thread = threading.current_thread()
58-
self._work_queue: queue.Queue[Union[_WorkItem, "Future[Any]"]] = queue.Queue()
59-
self._broken = False
58+
self._work_ready = threading.Condition(threading.Lock())
59+
self._work_items = deque[_WorkItem]() # synchronized by _work_ready
60+
self._broken = False # synchronized by _work_ready
61+
self._old_executor = old_executor
6062

6163
def run_until_future(self, future: "Future[Any]") -> None:
6264
"""
@@ -68,20 +70,25 @@ def run_until_future(self, future: "Future[Any]") -> None:
6870
raise RuntimeError(
6971
"You cannot run CurrentThreadExecutor from a different thread"
7072
)
71-
future.add_done_callback(self._work_queue.put)
72-
# Keep getting and running work items until we get the future we're waiting for
73-
# back via the future's done callback.
74-
try:
75-
while True:
73+
74+
def done(future: "Future[Any]") -> None:
75+
with self._work_ready:
76+
self._broken = True
77+
self._work_ready.notify()
78+
79+
future.add_done_callback(done)
80+
# Keep getting and running work items until the future we're waiting for
81+
# is done and the queue is empty.
82+
while True:
83+
with self._work_ready:
84+
while not self._work_items and not self._broken:
85+
self._work_ready.wait()
86+
if not self._work_items:
87+
break
7688
# Get a work item and run it
77-
work_item = self._work_queue.get()
78-
if work_item is future:
79-
return
80-
assert isinstance(work_item, _WorkItem)
81-
work_item.run()
82-
del work_item
83-
finally:
84-
self._broken = True
89+
work_item = self._work_items.popleft()
90+
work_item.run()
91+
del work_item
8592

8693
def _submit(
8794
self,
@@ -94,13 +101,23 @@ def _submit(
94101
raise RuntimeError(
95102
"You cannot submit onto CurrentThreadExecutor from its own thread"
96103
)
97-
# Check they're not too late or the executor errored
98-
if self._broken:
99-
raise RuntimeError("CurrentThreadExecutor already quit or is broken")
100-
# Add to work queue
101104
f: "Future[_R]" = Future()
102105
work_item = _WorkItem(f, fn, *args, **kwargs)
103-
self._work_queue.put(work_item)
106+
107+
# Walk up the CurrentThreadExecutor stack to find the closest one still
108+
# running
109+
executor = self
110+
while True:
111+
with executor._work_ready:
112+
if not executor._broken:
113+
# Add to work queue
114+
executor._work_items.append(work_item)
115+
executor._work_ready.notify()
116+
break
117+
if executor._old_executor is None:
118+
raise RuntimeError("CurrentThreadExecutor already quit or is broken")
119+
executor = executor._old_executor
120+
104121
# Return the future
105122
return f
106123

asgiref/sync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
195195
# need one for every sync frame, even if there's one above us in the
196196
# same thread.
197197
old_executor = getattr(self.executors, "current", None)
198-
current_executor = CurrentThreadExecutor()
198+
current_executor = CurrentThreadExecutor(old_executor)
199199
self.executors.current = current_executor
200200

201201
# Wrapping context in list so it can be reassigned from within

tests/test_sync.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,3 +1208,76 @@ def test_function(**kwargs: Any) -> None:
12081208

12091209
# SyncToAsync.__call__.loop.run_in_executor has a param named `task_context`.
12101210
await test_function(task_context=1)
1211+
1212+
1213+
def test_nested_task() -> None:
1214+
async def inner() -> asyncio.Task[None]:
1215+
return asyncio.create_task(sync_to_async(print)("inner"))
1216+
1217+
async def main() -> None:
1218+
task = await sync_to_async(async_to_sync(inner))()
1219+
await task
1220+
1221+
async_to_sync(main)()
1222+
1223+
1224+
def test_nested_task_later() -> None:
1225+
def later(fut: asyncio.Future[asyncio.Task[None]]) -> None:
1226+
task = asyncio.create_task(sync_to_async(print)("later"))
1227+
fut.set_result(task)
1228+
1229+
async def inner() -> asyncio.Future[asyncio.Task[None]]:
1230+
loop = asyncio.get_running_loop()
1231+
fut = loop.create_future()
1232+
loop.call_later(0.1, later, fut)
1233+
return fut
1234+
1235+
async def main() -> None:
1236+
fut = await sync_to_async(async_to_sync(inner))()
1237+
task = await fut
1238+
await task
1239+
1240+
async_to_sync(main)()
1241+
1242+
1243+
def test_double_nested_task() -> None:
1244+
async def inner() -> asyncio.Task[None]:
1245+
return asyncio.create_task(sync_to_async(print)("inner"))
1246+
1247+
async def outer() -> asyncio.Task[asyncio.Task[None]]:
1248+
return asyncio.create_task(sync_to_async(async_to_sync(inner))())
1249+
1250+
async def main() -> None:
1251+
outer_task = await sync_to_async(async_to_sync(outer))()
1252+
inner_task = await outer_task
1253+
await inner_task
1254+
1255+
async_to_sync(main)()
1256+
1257+
1258+
# asyncio.Barrier is new in Python 3.11. Nest definition (rather than using
1259+
# skipIf) to avoid mypy error.
1260+
if sys.version_info >= (3, 11):
1261+
1262+
def test_two_nested_tasks_with_asyncio_run() -> None:
1263+
barrier = asyncio.Barrier(3)
1264+
event = threading.Event()
1265+
1266+
async def inner() -> None:
1267+
task = asyncio.create_task(sync_to_async(event.wait)())
1268+
await barrier.wait()
1269+
await task
1270+
1271+
async def outer() -> tuple[asyncio.Task[None], asyncio.Task[None]]:
1272+
task0 = asyncio.create_task(inner())
1273+
task1 = asyncio.create_task(inner())
1274+
await barrier.wait()
1275+
event.set()
1276+
return task0, task1
1277+
1278+
async def main() -> None:
1279+
task0, task1 = await sync_to_async(async_to_sync(outer))()
1280+
await task0
1281+
await task1
1282+
1283+
asyncio.run(main())

0 commit comments

Comments
 (0)