Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions Lib/asyncio/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from . import exceptions
from . import mixins
from . import tasks


class _ContextManagerMixin:
Expand Down Expand Up @@ -350,6 +351,7 @@ def __init__(self, value=1, *, loop=mixins._marker):
raise ValueError("Semaphore initial value must be >= 0")
self._value = value
self._waiters = collections.deque()
self._wakeup_scheduled = False

def __repr__(self):
res = super().__repr__()
Expand All @@ -363,6 +365,7 @@ def _wake_up_next(self):
waiter = self._waiters.popleft()
if not waiter.done():
waiter.set_result(None)
self._wakeup_scheduled = True
return

def locked(self):
Expand All @@ -378,16 +381,17 @@ async def acquire(self):
called release() to make it larger than 0, and then return
True.
"""
while self._value <= 0:
# _wakeup_scheduled is set if *another* task is scheduled to wakeup
# but its acquire() is not resumed yet
while self._wakeup_scheduled or self._value <= 0:
fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
await fut
except:
# See the similar code in Queue.get.
fut.cancel()
if self._value > 0 and not fut.cancelled():
self._wake_up_next()
# reset _wakeup_scheduled *after* waiting for a future
self._wakeup_scheduled = False
except exceptions.CancelledError:
self._wake_up_next()
raise
self._value -= 1
return True
Expand Down
26 changes: 26 additions & 0 deletions Lib/test/test_asyncio/test_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,32 @@ async def test_release_no_waiters(self):
sem.release()
self.assertFalse(sem.locked())

async def test_acquire_fifo_order(self):
sem = asyncio.Semaphore(1)
result = []

async def coro(tag):
await sem.acquire()
result.append(f'{tag}_1')
await asyncio.sleep(0.01)
sem.release()

await sem.acquire()
result.append(f'{tag}_2')
await asyncio.sleep(0.01)
sem.release()

t1 = asyncio.create_task(coro('c1'))
t2 = asyncio.create_task(coro('c2'))
t3 = asyncio.create_task(coro('c3'))

await asyncio.gather(t1, t2, t3)

self.assertEqual(
['c1_1', 'c2_1', 'c3_1', 'c1_2', 'c2_2', 'c3_2'],
result
)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix :class:`asyncio.Semaphore` re-aquiring FIFO order.