Skip to content

bpo-40616: Add asyncio.BufferQueue #20071

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Doc/library/asyncio-queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ LIFO Queue
entries first (last in, first out).


Buffer Queue
============

.. class:: BufferQueue

A variant of :class:`Queue` that drops earliest added entries when
:meth:`qsize` exceeds :attr:`maxsize`.


Exceptions
==========

Expand Down
29 changes: 28 additions & 1 deletion Lib/asyncio/queues.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
__all__ = (
'Queue', 'PriorityQueue', 'LifoQueue', 'BufferQueue', 'QueueFull', 'QueueEmpty',
)

import collections
import heapq
Expand Down Expand Up @@ -250,3 +252,28 @@ def _put(self, item):

def _get(self):
return self._queue.pop()


class BufferQueue(Queue):
"""A subclass of Queue that drops earliest added entries when `BufferQueue.qsize()`
exceeds `BufferQueue.maxsize`.

Acts exactly as Queue if maxsize is not provided or less than or equal to zero.
"""

def _init(self, maxsize):
"""Leverages `colections.deque` built-in functionality."""
maxlen = maxsize if maxsize is not None and maxsize > 0 else None
self._queue = collections.deque(maxlen=maxlen)

def _put(self, item):
"""Fixes counters when BufferQueue is about to drop an item to allow join to
work properly.
"""
if Queue.full(self):
self.task_done()
self._queue.append(item)

def full(self):
"""Always returns `False` as BufferQueue can never be overflown."""
return False
18 changes: 18 additions & 0 deletions Lib/test/test_asyncio/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,20 @@ def test_order(self):
self.assertEqual([1, 2, 3], items)


class BufferQueueTests(_QueueTestBase):

def test_buffer(self):
with self.assertWarns(DeprecationWarning):
q = asyncio.BufferQueue(maxsize=2, loop=self.loop)
for i in [1, 3, 2]:
q.put_nowait(i)

self.assertEqual(2, q.qsize())

items = [q.get_nowait() for _ in range(2)]
self.assertEqual([3, 2], items)


class _QueueJoinTestMixin:

q_class = None
Expand Down Expand Up @@ -714,5 +728,9 @@ class PriorityQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
q_class = asyncio.PriorityQueue


class BufferQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
q_class = asyncio.BufferQueue


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added :class:`asyncio.BufferQueue` which implements the circular buffer
concept.