diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index 524560b691d720..59733f825a1758 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -130,6 +130,15 @@ LIFO Queue entries first (last in, first out). +Buffer Queue +============ + +.. class:: BufferQueue + + A variant of :class:`Queue` that drops earliest added entries when + :meth:`qsize` exceeds :attr:`maxsize`. + + Exceptions ========== diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index cd3f7c6a567891..7f7b18513b2d9e 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -1,4 +1,6 @@ -__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') +__all__ = ( + 'Queue', 'PriorityQueue', 'LifoQueue', 'BufferQueue', 'QueueFull', 'QueueEmpty', +) import collections import heapq @@ -250,3 +252,28 @@ def _put(self, item): def _get(self): return self._queue.pop() + + +class BufferQueue(Queue): + """A subclass of Queue that drops earliest added entries when `BufferQueue.qsize()` + exceeds `BufferQueue.maxsize`. + + Acts exactly as Queue if maxsize is not provided or less than or equal to zero. + """ + + def _init(self, maxsize): + """Leverages `colections.deque` built-in functionality.""" + maxlen = maxsize if maxsize is not None and maxsize > 0 else None + self._queue = collections.deque(maxlen=maxlen) + + def _put(self, item): + """Fixes counters when BufferQueue is about to drop an item to allow join to + work properly. + """ + if Queue.full(self): + self.task_done() + self._queue.append(item) + + def full(self): + """Always returns `False` as BufferQueue can never be overflown.""" + return False diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 5c9aaa82c311a5..c88f31ecd91a7f 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -635,6 +635,20 @@ def test_order(self): self.assertEqual([1, 2, 3], items) +class BufferQueueTests(_QueueTestBase): + + def test_buffer(self): + with self.assertWarns(DeprecationWarning): + q = asyncio.BufferQueue(maxsize=2, loop=self.loop) + for i in [1, 3, 2]: + q.put_nowait(i) + + self.assertEqual(2, q.qsize()) + + items = [q.get_nowait() for _ in range(2)] + self.assertEqual([3, 2], items) + + class _QueueJoinTestMixin: q_class = None @@ -714,5 +728,9 @@ class PriorityQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase): q_class = asyncio.PriorityQueue +class BufferQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase): + q_class = asyncio.BufferQueue + + if __name__ == '__main__': unittest.main() diff --git a/Misc/NEWS.d/next/Library/2020-05-13-19-58-46.bpo-40616.76LN-f.rst b/Misc/NEWS.d/next/Library/2020-05-13-19-58-46.bpo-40616.76LN-f.rst new file mode 100644 index 00000000000000..0644ad4e8a7cd2 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-05-13-19-58-46.bpo-40616.76LN-f.rst @@ -0,0 +1,2 @@ +Added :class:`asyncio.BufferQueue` which implements the circular buffer +concept.