Skip to content

bpo-41279: Add StreamReaderBufferedProtocol #21446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
116 changes: 84 additions & 32 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,29 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,

def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None, buffer_size=65536):
self._pending_data_length = -1
self._paused = True
self._buffer_size = buffer_size
self._pending_data = None
self._pending_data_length = -1
self._recv_fut_canceled = False
self._read_fut = None
self._protocol = None

super().__init__(loop, sock, protocol, waiter, extra, server)

self._data = bytearray(buffer_size)
self._loop.call_soon(self._loop_reading)
self._paused = False

def set_protocol(self, protocol):
super().set_protocol(protocol)

if isinstance(protocol, protocols.BufferedProtocol):
self._data = protocol.get_buffer(self._buffer_size)

if self._read_fut:
self._read_fut.cancel()
self._recv_fut_canceled = True

def is_reading(self):
return not self._paused and not self._closing

Expand Down Expand Up @@ -218,12 +233,20 @@ def resume_reading(self):
if self._read_fut is None:
self._loop.call_soon(self._loop_reading, None)

length = self._pending_data_length
self._pending_data_length = -1
if length > -1:
# Call the protocol method after calling _loop_reading(),
# since the protocol can decide to pause reading again.
self._loop.call_soon(self._data_received, self._data[:length], length)
if isinstance(self._protocol, protocols.BufferedProtocol):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the corresponding changes in selector_events.py. Does it handle these use cases already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it has the following bit of code:

    def set_protocol(self, protocol):
        if isinstance(protocol, protocols.BufferedProtocol):
            self._read_ready_cb = self._read_ready__get_buffer
        else:
            self._read_ready_cb = self._read_ready__data_received

        super().set_protocol(protocol)

length = self._pending_data_length
self._pending_data_length = -1
if length > -1:
# Call the protocol method after calling _loop_reading(),
# since the protocol can decide to pause reading again.
self._loop.call_soon(self._buffer_updated, length)
else:
data = self._pending_data
self._pending_data = None
if data is not None:
# Call the protocol method after calling _loop_reading(),
# since the protocol can decide to pause reading again.
self._loop.call_soon(self._data_received, data)

if self._loop.get_debug():
logger.debug("%r resumes reading", self)
Expand All @@ -244,7 +267,7 @@ def _eof_received(self):
if not keep_open:
self.close()

def _data_received(self, data, length):
def _buffer_updated(self, length):
if self._paused:
# Don't call any protocol method while reading is paused.
# The protocol will be called on resume_reading().
Expand All @@ -256,35 +279,62 @@ def _data_received(self, data, length):
self._eof_received()
return

try:
self._protocol.buffer_updated(length)
except BaseException as exc:
self._fatal_error(exc,
'Fatal error: protocol.buffer_updated() '
'call failed.')

def _data_received(self, data):
if self._paused:
# Don't call any protocol method while reading is paused.
# The protocol will be called on resume_reading().
assert self._pending_data is None
self._pending_data = data
return

if not data:
self._eof_received()
return

self._protocol.data_received(data)

def _handle_recv_result(self, result):
"""
Handles the future result of recv / recv_into.
Returns if should continue reading or not, determined by EOF.
"""
if isinstance(self._protocol, protocols.BufferedProtocol):
try:
protocols._feed_data_to_buffered_proto(self._protocol, data)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(exc,
'Fatal error: protocol.buffer_updated() '
'call failed.')
return
length = result
if length > -1:
self._buffer_updated(length)
if length == 0:
return False
else:
self._protocol.data_received(data)
data = result
self._data_received(data)
if not data:
return False
return True

def _loop_reading(self, fut=None):
length = -1
data = None
try:
if fut is not None:
assert self._read_fut is fut or (self._read_fut is None and
self._closing)
self._read_fut = None
if fut.done():
# deliver data later in "finally" clause
length = fut.result()
if length == 0:
# we got end-of-file so no need to reschedule a new read
return

data = self._data[:length]
try:
if not self._handle_recv_result(fut.result()):
# we got end-of-file so no need to reschedule a new read
return
except exceptions.CancelledError:
if self._recv_fut_canceled:
# a cancellation is expected on a change of protocol
self._recv_fut_canceled = True
else:
raise
else:
# the future will be replaced by next proactor.recv call
fut.cancel()
Expand All @@ -298,7 +348,12 @@ def _loop_reading(self, fut=None):

if not self._paused:
# reschedule a new read
self._read_fut = self._loop._proactor.recv_into(self._sock, self._data)
if isinstance(self._protocol, protocols.BufferedProtocol):
self._read_fut = self._loop._proactor.recv_into(
self._sock, self._data)
else:
self._read_fut = self._loop._proactor.recv(
self._sock, self._buffer_size)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
Expand All @@ -315,9 +370,6 @@ def _loop_reading(self, fut=None):
else:
if not self._paused:
self._read_fut.add_done_callback(self._loop_reading)
finally:
if length > -1:
self._data_received(data, length)


class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
Expand Down
33 changes: 26 additions & 7 deletions Lib/asyncio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def factory():
return await loop.create_unix_server(factory, path, **kwds)


class FlowControlMixin(protocols.Protocol):
class FlowControlMixin(protocols.BaseProtocol):
"""Reusable flow control logic for StreamWriter.drain().

This implements the protocol methods pause_writing(),
Expand Down Expand Up @@ -180,7 +180,7 @@ def _get_close_waiter(self, stream):
raise NotImplementedError


class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
class BaseStreamReaderProtocol(FlowControlMixin):
"""Helper class to adapt between Protocol and StreamReader.

(This is a helper class instead of making StreamReader itself a
Expand Down Expand Up @@ -267,11 +267,6 @@ def connection_lost(self, exc):
self._stream_writer = None
self._transport = None

def data_received(self, data):
reader = self._stream_reader
if reader is not None:
reader.feed_data(data)

def eof_received(self):
reader = self._stream_reader
if reader is not None:
Expand All @@ -298,6 +293,30 @@ def __del__(self):
closed.exception()


class StreamReaderProtocol(BaseStreamReaderProtocol, protocols.Protocol):
def data_received(self, data):
reader = self._stream_reader
if reader is not None:
reader.feed_data(data)


class StreamReaderBufferedProtocol(BaseStreamReaderProtocol, protocols.BufferedProtocol):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't follow.
Where is StreamReaderBufferedProtocol class used (except tests)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't used right now, as there is no way to include this class without changing or adding to the API.
It will be useful in the future though, once we do make API changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please elaborate.
From my understanding, the PR contains dead code only. Despite the code correctness, it is not used by the implementation of the streams.
What future API changes are you talking about? Can we apply these changes as a part of this pull request?

I like this PR, it makes a value if the buffered protocol is used. But if this protocol is not used -- why we need the PR at all?

I can imagine a little different approach: don't extract BaseStreamReaderProtocol but inherit StreamReaderProtocol from both Protocol and BufferedProtocol, implementing all data_received(), get_buffer() and update_buffer() in the same class.
asyncio selects the best approach currently: it uses the buffered protocol version if the protocol is an instance of BufferedProtocol with fallback to data_received() otherwise. We can utilize this facility.

What do you think?

Copy link
Contributor Author

@tontinton tontinton Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this approach is that the BufferedProtocol allocates a 64k byte buffer, so on servers with a lot of connections, memory will start running out quickly as each stream now holds a 64k buffer.

For example on a server with 10k clients, python will hold ~640mb of memory.

This is why we should probably make it a custom choice of when to use the new BufferedProtocol or creating some kind of a protocol pool of a small size, so that the first few streams will be faster.

But this means that we either change / add API or choose a set number defaulted to the protocol pool.

Btw, doing the buffered protocol pool approach is not really trivial as I don't currently know when a BufferedProtocol needs to return to the pool (maybe I can think of something if you tell me that it's worth it, I'll be happy to implement something like that)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have limit argument to control output buffer.
We can reuse the same number for controlling input buffer size as well.
In practice, I very doubt if people need different values for that.

I can imagine a variadic input buffer size starting from some default value and growing up to a limit but I doubt if we need it really. A simple implementation can be good enough.

A pool of buffers is an interesting idea but, again, the feature can be added later.

Thoughts?

Copy link
Contributor Author

@tontinton tontinton Aug 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have limit argument to control output buffer.
We can reuse the same number for controlling input buffer size as well.
In practice, I very doubt if people need different values for that.

I don't really understand what you mean by using the limit argument as an input buffer size, as StreamReader's limit argument actually controls the buffer limit of what the read apis (such as read_exactly or read_line) use to get the buffer from, so really this argument controls the input buffer size.

At the beginning what I did was that if limit received was 0, then I would use the BufferedProtocol instead of the regular Protocol, but @1st1 didn't like this as it was essentially an API change (which is frozen, understandably).

I can imagine a variadic input buffer size starting from some default value and growing up to a limit but I doubt if we need it really. A simple implementation can be good enough.

I believe what you mean is already implemented in StreamReader but I am not sure.
Also I am against reallocating the buffer as it would remove all benefits of BufferedProtocol.

A pool of buffers is an interesting idea but, again, the feature can be added later.

I think this is the best solution, though we need to think if we want that feature to be in this PR or on a different one.

By the way, if you know of a nice way of knowing when we should return a BufferedProtocol to the pool it would be nice to know your thoughts.
Right now I am thinking of creating a callback that is called when the protocol has closed or something like that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reckon that the default buffer size is not necessarily decided by limit argument. We might start with a relatively small buffer, e.g. io.DEFAULT_BUFFER_SIZE or less, then expand and shrink buffer on demand.

Additionally, it appears that feed_data inflicts extra copy, so is it viable to directly write the buffer in BaseStreamReaderProtocol instead of the buffer in StreamReaderBufferedProtocol, or more aggresively utilize efficient and mature io.BufferedIOBase to implement StreamReaderBufferedProtocol?

def __init__(self, stream_reader, client_connected_cb=None, loop=None,
buffer_size=65536):
super().__init__(stream_reader,
client_connected_cb=client_connected_cb,
loop=loop)
self._buffer = memoryview(bytearray(buffer_size))

def get_buffer(self, sizehint):
return self._buffer

def buffer_updated(self, nbytes):
reader = self._stream_reader
if reader is not None:
reader.feed_data(self._buffer[:nbytes])


class StreamWriter:
"""Wraps a Transport.

Expand Down
41 changes: 38 additions & 3 deletions Lib/asyncio/unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from . import events
from . import exceptions
from . import futures
from . import protocols
from . import selector_events
from . import tasks
from . import transports
Expand Down Expand Up @@ -480,9 +481,13 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
os.set_blocking(self._fileno, False)

self._loop.call_soon(self._protocol.connection_made, self)

# only start reading when connection_made() has been called
self._loop.call_soon(self._loop._add_reader,
self._fileno, self._read_ready)
if isinstance(protocol, protocols.BufferedProtocol):
self._read_ready = self._readinto_buffer_ready
else:
self._read_ready = self._read_buffer_ready
self._loop.call_soon(self._loop._add_reader, self._fileno, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
self._loop.call_soon(futures._set_result_unless_cancelled,
Expand All @@ -509,7 +514,37 @@ def __repr__(self):
info.append('closed')
return '<{}>'.format(' '.join(info))

def _read_ready(self):
def _readinto_buffer_ready(self):
try:
buf = self._protocol.get_buffer(-1)
if not len(buf):
raise RuntimeError('get_buffer() returned an empty buffer')
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
Copy link
Member

@1st1 1st1 Jul 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some specific need to handle BaseException here instead of Exception?

Never mind, this is how we typically invoke callback methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
return

nbytes = 0
try:
nbytes = self._pipe.readinto(buf)
except (BlockingIOError, InterruptedError):
pass
except OSError as exc:
self._fatal_error(exc, 'Fatal read error on pipe transport')
else:
if nbytes:
self._protocol.buffer_updated(nbytes)
else:
if self._loop.get_debug():
logger.info("%r was closed by peer", self)
self._closing = True
self._loop._remove_reader(self._fileno)
self._loop.call_soon(self._protocol.eof_received)
self._loop.call_soon(self._call_connection_lost, None)

def _read_buffer_ready(self):
try:
data = os.read(self._fileno, self.max_size)
except (BlockingIOError, InterruptedError):
Expand Down
Loading