Skip to content

gh-109051: asyncio: remove outgoing buffer from sslproto #109571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,14 @@ def _check_ssl_socket(sock):

class _SendfileFallbackProtocol(protocols.Protocol):
def __init__(self, transp):
if not isinstance(transp, transports._FlowControlMixin):
raise TypeError("transport should be _FlowControlMixin instance")
self._transport = transp
self._proto = transp.get_protocol()
self._should_resume_reading = transp.is_reading()
self._should_resume_writing = transp._protocol_paused

# should we expect a call to resume_writing?
_, high_water = transp.get_write_buffer_limits()
self._should_resume_writing = transp.get_write_buffer_size() > high_water

transp.pause_reading()
transp.set_protocol(self)
if self._should_resume_writing:
Expand Down
114 changes: 12 additions & 102 deletions Lib/asyncio/sslproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ def add_flowcontrol_defaults(high, low, kb):
return hi, lo


class _SSLProtocolTransport(transports._FlowControlMixin,
transports.Transport):
class _SSLProtocolTransport(transports.Transport):

_start_tls_compatible = True
_sendfile_compatible = constants._SendfileMode.FALLBACK
Expand Down Expand Up @@ -158,16 +157,14 @@ def set_write_buffer_limits(self, high=None, low=None):
reduces opportunities for doing I/O and computation
concurrently.
"""
self._ssl_protocol._set_write_buffer_limits(high, low)
self._ssl_protocol._control_app_writing()
self._ssl_protocol._transport.set_write_buffer_limits(high, low)

def get_write_buffer_limits(self):
return (self._ssl_protocol._outgoing_low_water,
self._ssl_protocol._outgoing_high_water)
return self._ssl_protocol._transport.get_write_buffer_limits()

def get_write_buffer_size(self):
"""Return the current size of the write buffers."""
return self._ssl_protocol._get_write_buffer_size()
return self._ssl_protocol._transport.get_write_buffer_size()

def set_read_buffer_limits(self, high=None, low=None):
"""Set the high- and low-water limits for read flow control.
Expand Down Expand Up @@ -199,11 +196,6 @@ def get_read_buffer_size(self):
"""Return the current size of the read buffer."""
return self._ssl_protocol._get_read_buffer_size()

@property
def _protocol_paused(self):
# Required for sendfile fallback pause_writing/resume_writing logic
return self._ssl_protocol._app_writing_paused

def write(self, data):
"""Write some data bytes to the transport.

Expand Down Expand Up @@ -251,11 +243,6 @@ def _force_close(self, exc):
self._closed = True
self._ssl_protocol._abort(exc)

def _test__append_write_backlog(self, data):
# for test only
self._ssl_protocol._write_backlog.append(data)
self._ssl_protocol._write_buffer_size += len(data)


class SSLProtocol(protocols.BufferedProtocol):
max_size = 256 * 1024 # Buffer size passed to read()
Expand Down Expand Up @@ -302,10 +289,6 @@ def __init__(self, loop, app_protocol, sslcontext, waiter,
# completes.
self._extra = dict(sslcontext=sslcontext)

# App data write buffering
self._write_backlog = collections.deque()
self._write_buffer_size = 0

self._waiter = waiter
self._loop = loop
self._set_app_protocol(app_protocol)
Expand All @@ -331,8 +314,6 @@ def __init__(self, loop, app_protocol, sslcontext, waiter,

# Flow Control

self._ssl_writing_paused = False

self._app_reading_paused = False

self._ssl_reading_paused = False
Expand All @@ -341,10 +322,6 @@ def __init__(self, loop, app_protocol, sslcontext, waiter,
self._set_read_buffer_limits()
self._eof_received = False

self._app_writing_paused = False
self._outgoing_high_water = 0
self._outgoing_low_water = 0
self._set_write_buffer_limits()
self._get_app_transport()

def _set_app_protocol(self, app_protocol):
Expand Down Expand Up @@ -391,7 +368,6 @@ def connection_lost(self, exc):
meaning a regular EOF is received or the connection was
aborted or closed).
"""
self._write_backlog.clear()
self._outgoing.read()
self._conn_lost += 1

Expand Down Expand Up @@ -468,7 +444,6 @@ def eof_received(self):
self._do_flush()

elif self._state == SSLProtocolState.FLUSHING:
self._do_write()
self._set_state(SSLProtocolState.SHUTDOWN)
self._do_shutdown()

Expand Down Expand Up @@ -682,38 +657,19 @@ def _write_appdata(self, list_of_data):
return

for data in list_of_data:
self._write_backlog.append(data)
self._write_buffer_size += len(data)
self._sslobj.write(data)

try:
if self._state == SSLProtocolState.WRAPPED:
self._do_write()
self._process_outgoing()

except Exception as ex:
self._fatal_error(ex, 'Fatal error on SSL protocol')

def _do_write(self):
try:
while self._write_backlog:
data = self._write_backlog[0]
count = self._sslobj.write(data)
data_len = len(data)
if count < data_len:
self._write_backlog[0] = data[count:]
self._write_buffer_size -= count
else:
del self._write_backlog[0]
self._write_buffer_size -= data_len
except SSLAgainErrors:
pass
self._process_outgoing()

def _process_outgoing(self):
if not self._ssl_writing_paused:
data = self._outgoing.read()
if len(data):
self._transport.write(data)
self._control_app_writing()
data = self._outgoing.read()
if len(data):
self._transport.write(data)

# Incoming flow

Expand All @@ -731,10 +687,7 @@ def _do_read(self):
self._do_read__buffered()
else:
self._do_read__copied()
if self._write_backlog:
self._do_write()
else:
self._process_outgoing()
self._process_outgoing()
self._control_ssl_reading()
except Exception as ex:
self._fatal_error(ex, 'Fatal error on SSL protocol')
Expand Down Expand Up @@ -811,46 +764,6 @@ def _call_eof_received(self):
except BaseException as ex:
self._fatal_error(ex, 'Error calling eof_received()')

# Flow control for writes from APP socket

def _control_app_writing(self):
size = self._get_write_buffer_size()
if size >= self._outgoing_high_water and not self._app_writing_paused:
self._app_writing_paused = True
try:
self._app_protocol.pause_writing()
except (KeyboardInterrupt, SystemExit):
raise
except BaseException as exc:
self._loop.call_exception_handler({
'message': 'protocol.pause_writing() failed',
'exception': exc,
'transport': self._app_transport,
'protocol': self,
})
elif size <= self._outgoing_low_water and self._app_writing_paused:
self._app_writing_paused = False
try:
self._app_protocol.resume_writing()
except (KeyboardInterrupt, SystemExit):
raise
except BaseException as exc:
self._loop.call_exception_handler({
'message': 'protocol.resume_writing() failed',
'exception': exc,
'transport': self._app_transport,
'protocol': self,
})

def _get_write_buffer_size(self):
return self._outgoing.pending + self._write_buffer_size

def _set_write_buffer_limits(self, high=None, low=None):
high, low = add_flowcontrol_defaults(
high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_WRITE)
self._outgoing_high_water = high
self._outgoing_low_water = low

# Flow control for reads to APP socket

def _pause_reading(self):
Expand Down Expand Up @@ -895,16 +808,13 @@ def pause_writing(self):
"""Called when the low-level transport's buffer goes over
the high-water mark.
"""
assert not self._ssl_writing_paused
self._ssl_writing_paused = True
self._app_protocol.pause_writing()

def resume_writing(self):
"""Called when the low-level transport's buffer drains below
the low-water mark.
"""
assert self._ssl_writing_paused
self._ssl_writing_paused = False
self._process_outgoing()
self._app_protocol.resume_writing()

def _fatal_error(self, exc, message='Fatal error on transport'):
if self._transport:
Expand Down
Loading