From 8f36f7609200c4a80a6f75988cf5e924a7066a36 Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Wed, 15 Jul 2020 19:34:46 +0300 Subject: [PATCH 1/2] bpo-41305: Add StreamReader.readinto() (GH-21491) This function is useful when you want to fill an already allocated buffer with data read from the stream. --- Doc/library/asyncio-stream.rst | 7 ++++ Lib/asyncio/streams.py | 38 +++++++++++++++++++ .../2020-07-15-19-34-39.bpo-41305.3In2Tp.rst | 1 + 3 files changed, 46 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2020-07-15-19-34-39.bpo-41305.3In2Tp.rst diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index b76ed379c7f4c8..f6624a7cd0c902 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -211,6 +211,13 @@ StreamReader .. versionadded:: 3.5.2 + .. coroutinemethod:: readinto(buf) + + Read up to *n* bytes with *n* being equal to the length of *buf* and + copy the buffer read from the stream into *buf*. + + Return the number of bytes read from the stream. + .. method:: at_eof() Return ``True`` if the buffer is empty and :meth:`feed_eof` diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 3c80bb88925905..e6a1a0eff2c5ca 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -690,6 +690,44 @@ async def read(self, n=-1): self._maybe_resume_transport() return data + async def readinto(self, buf): + """Read up to the length of `buf` number of bytes from the stream and + fill the `buf` bytes object with what was read from the stream. + + Returns the number of bytes read from the stream. + + If `buf` is empty, return 0 immediately. + + If `buf` is not empty, this function tries to read the length of `buf` + number of bytes, and may fill the buffer with less or equal bytes than + requested, but at least one byte. If EOF was received before any byte + is read, this function returns 0. + + Returned value is not limited with limit, configured at stream + creation. + + If stream was paused, this function will automatically resume it if + needed. + """ + n = len(buf) + + if self._exception is not None: + raise self._exception + + if n == 0: + return 0 + + if not self._buffer and not self._eof: + await self._wait_for_data('readinto') + + length = min(n, len(self._buffer)) + if length > 0: + buf[:length] = self._buffer[:length] + + del self._buffer[:n] + self._maybe_resume_transport() + return length + async def readexactly(self, n): """Read exactly `n` bytes. diff --git a/Misc/NEWS.d/next/Library/2020-07-15-19-34-39.bpo-41305.3In2Tp.rst b/Misc/NEWS.d/next/Library/2020-07-15-19-34-39.bpo-41305.3In2Tp.rst new file mode 100644 index 00000000000000..342cf363b65fe2 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-07-15-19-34-39.bpo-41305.3In2Tp.rst @@ -0,0 +1 @@ +Add the ``StreamReader.readinto(buf)`` function. From c8a0d69e41481e61b8faa31f31e23d54043fdc49 Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Wed, 15 Jul 2020 20:38:30 +0300 Subject: [PATCH 2/2] bpo-41305: Add tests for StreamReader.readinto() (GH-21491) --- Lib/test/test_asyncio/test_streams.py | 74 +++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 1e9d115661d087..976fde50c2f761 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -236,6 +236,80 @@ def test_read_limit(self): self.assertEqual(b'chunk', data) self.assertEqual(b'', stream._buffer) + def test_readinto_empty_buffer(self): + # Read zero bytes. + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(self.DATA) + + data = bytearray(0) + length = self.loop.run_until_complete(stream.readinto(data)) + self.assertEqual(0, length) + self.assertEqual(b'', data) + self.assertEqual(self.DATA, stream._buffer) + + def test_readinto(self): + # Read bytes. + stream = asyncio.StreamReader(loop=self.loop) + data = bytearray(30) + read_task = self.loop.create_task(stream.readinto(data)) + + def cb(): + stream.feed_data(self.DATA) + self.loop.call_soon(cb) + + length = self.loop.run_until_complete(read_task) + self.assertEqual(len(self.DATA), length) + self.assertEqual(self.DATA, data[:length]) + self.assertEqual(b'', stream._buffer) + + def test_readinto_line_breaks(self): + # Read bytes without line breaks. + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'line1') + stream.feed_data(b'line2') + + data = bytearray(5) + length = self.loop.run_until_complete(stream.readinto(data)) + + self.assertEqual(5, length) + self.assertEqual(b'line1', data) + self.assertEqual(b'line2', stream._buffer) + + def test_readinto_eof(self): + # Read bytes, stop at eof. + stream = asyncio.StreamReader(loop=self.loop) + data = bytearray(1024) + read_task = self.loop.create_task(stream.readinto(data)) + + def cb(): + stream.feed_eof() + self.loop.call_soon(cb) + + length = self.loop.run_until_complete(read_task) + self.assertEqual(0, length) + self.assertEqual(b'', stream._buffer) + + def test_readinto_exception(self): + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'line\n') + + data = bytearray(2) + length = self.loop.run_until_complete(stream.readinto(data)) + self.assertEqual(b'li', data) + + stream.set_exception(ValueError()) + self.assertRaises( + ValueError, self.loop.run_until_complete, stream.readinto(data)) + + def test_readinto_limit(self): + stream = asyncio.StreamReader(limit=3, loop=self.loop) + stream.feed_data(b'chunk') + data = bytearray(5) + length = self.loop.run_until_complete(stream.readinto(data)) + self.assertEqual(5, length) + self.assertEqual(b'chunk', data) + self.assertEqual(b'', stream._buffer) + def test_readline(self): # Read one line. 'readline' will need to wait for the data # to come from 'cb'