Skip to content

Commit a076e4f

Browse files
authored
bpo-36802: Drop awrite()/aclose(), support await write() and await close() instead (#13099)
1 parent 3b2f9ab commit a076e4f

File tree

4 files changed

+118
-51
lines changed

4 files changed

+118
-51
lines changed

Doc/library/asyncio-stream.rst

+59-31
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ streams::
2222
'127.0.0.1', 8888)
2323

2424
print(f'Send: {message!r}')
25-
await writer.awrite(message.encode())
25+
await writer.write(message.encode())
2626

2727
data = await reader.read(100)
2828
print(f'Received: {data.decode()!r}')
2929

3030
print('Close the connection')
31-
await writer.aclose()
31+
await writer.close()
3232

3333
asyncio.run(tcp_echo_client('Hello World!'))
3434

@@ -226,23 +226,70 @@ StreamWriter
226226
directly; use :func:`open_connection` and :func:`start_server`
227227
instead.
228228

229-
.. coroutinemethod:: awrite(data)
229+
.. method:: write(data)
230+
231+
The method attempts to write the *data* to the underlying socket immediately.
232+
If that fails, the data is queued in an internal write buffer until it can be
233+
sent.
234+
235+
Starting with Python 3.8, it is possible to directly await on the `write()`
236+
method::
237+
238+
await stream.write(data)
239+
240+
The ``await`` pauses the current coroutine until the data is written to the
241+
socket.
242+
243+
Below is an equivalent code that works with Python <= 3.7::
244+
245+
stream.write(data)
246+
await stream.drain()
247+
248+
.. versionchanged:: 3.8
249+
Support ``await stream.write(...)`` syntax.
250+
251+
.. method:: writelines(data)
252+
253+
The method writes a list (or any iterable) of bytes to the underlying socket
254+
immediately.
255+
If that fails, the data is queued in an internal write buffer until it can be
256+
sent.
257+
258+
Starting with Python 3.8, it is possible to directly await on the `write()`
259+
method::
260+
261+
await stream.writelines(lines)
262+
263+
The ``await`` pauses the current coroutine until the data is written to the
264+
socket.
265+
266+
Below is an equivalent code that works with Python <= 3.7::
230267

231-
Write *data* to the stream.
268+
stream.writelines(lines)
269+
await stream.drain()
232270

233-
The method respects flow control, execution is paused if the write
234-
buffer reaches the high watermark.
271+
.. versionchanged:: 3.8
272+
Support ``await stream.writelines()`` syntax.
235273

236-
.. versionadded:: 3.8
274+
.. method:: close()
275+
276+
The method closes the stream and the underlying socket.
277+
278+
Starting with Python 3.8, it is possible to directly await on the `close()`
279+
method::
280+
281+
await stream.close()
237282

238-
.. coroutinemethod:: aclose()
283+
The ``await`` pauses the current coroutine until the stream and the underlying
284+
socket are closed (and SSL shutdown is performed for a secure connection).
239285

240-
Close the stream.
286+
Below is an equivalent code that works with Python <= 3.7::
241287

242-
Wait until all closing actions are complete, e.g. SSL shutdown for
243-
secure sockets.
288+
stream.close()
289+
await stream.wait_closed()
244290

245-
.. versionadded:: 3.8
291+
.. versionchanged:: 3.8
292+
Support ``await stream.close()`` syntax.
246293

247294
.. method:: can_write_eof()
248295

@@ -263,21 +310,6 @@ StreamWriter
263310
Access optional transport information; see
264311
:meth:`BaseTransport.get_extra_info` for details.
265312

266-
.. method:: write(data)
267-
268-
Write *data* to the stream.
269-
270-
This method is not subject to flow control. Calls to ``write()`` should
271-
be followed by :meth:`drain`. The :meth:`awrite` method is a
272-
recommended alternative the applies flow control automatically.
273-
274-
.. method:: writelines(data)
275-
276-
Write a list (or any iterable) of bytes to the stream.
277-
278-
This method is not subject to flow control. Calls to ``writelines()``
279-
should be followed by :meth:`drain`.
280-
281313
.. coroutinemethod:: drain()
282314

283315
Wait until it is appropriate to resume writing to the stream.
@@ -293,10 +325,6 @@ StreamWriter
293325
be resumed. When there is nothing to wait for, the :meth:`drain`
294326
returns immediately.
295327

296-
.. method:: close()
297-
298-
Close the stream.
299-
300328
.. method:: is_closing()
301329

302330
Return ``True`` if the stream is closed or in the process of

Lib/asyncio/streams.py

+27-8
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,8 @@ def __init__(self, transport, protocol, reader, loop,
352352
assert reader is None or isinstance(reader, StreamReader)
353353
self._reader = reader
354354
self._loop = loop
355+
self._complete_fut = self._loop.create_future()
356+
self._complete_fut.set_result(None)
355357

356358
def __repr__(self):
357359
info = [self.__class__.__name__, f'transport={self._transport!r}']
@@ -365,9 +367,33 @@ def transport(self):
365367

366368
def write(self, data):
367369
self._transport.write(data)
370+
return self._fast_drain()
368371

369372
def writelines(self, data):
370373
self._transport.writelines(data)
374+
return self._fast_drain()
375+
376+
def _fast_drain(self):
377+
# The helper tries to use fast-path to return already existing complete future
378+
# object if underlying transport is not paused and actual waiting for writing
379+
# resume is not needed
380+
if self._reader is not None:
381+
# this branch will be simplified after merging reader with writer
382+
exc = self._reader.exception()
383+
if exc is not None:
384+
fut = self._loop.create_future()
385+
fut.set_exception(exc)
386+
return fut
387+
if not self._transport.is_closing():
388+
if self._protocol._connection_lost:
389+
fut = self._loop.create_future()
390+
fut.set_exception(ConnectionResetError('Connection lost'))
391+
return fut
392+
if not self._protocol._paused:
393+
# fast path, the stream is not paused
394+
# no need to wait for resume signal
395+
return self._complete_fut
396+
return self._loop.create_task(self.drain())
371397

372398
def write_eof(self):
373399
return self._transport.write_eof()
@@ -377,6 +403,7 @@ def can_write_eof(self):
377403

378404
def close(self):
379405
self._transport.close()
406+
return self._protocol._get_close_waiter(self)
380407

381408
def is_closing(self):
382409
return self._transport.is_closing()
@@ -408,14 +435,6 @@ async def drain(self):
408435
raise ConnectionResetError('Connection lost')
409436
await self._protocol._drain_helper()
410437

411-
async def aclose(self):
412-
self.close()
413-
await self.wait_closed()
414-
415-
async def awrite(self, data):
416-
self.write(data)
417-
await self.drain()
418-
419438

420439
class StreamReader:
421440

Lib/test/test_asyncio/test_streams.py

+30-12
Original file line numberDiff line numberDiff line change
@@ -1035,24 +1035,42 @@ def test_del_stream_before_connection_made(self):
10351035
messages[0]['message'])
10361036

10371037
def test_async_writer_api(self):
1038+
async def inner(httpd):
1039+
rd, wr = await asyncio.open_connection(*httpd.address)
1040+
1041+
await wr.write(b'GET / HTTP/1.0\r\n\r\n')
1042+
data = await rd.readline()
1043+
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
1044+
data = await rd.read()
1045+
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
1046+
await wr.close()
1047+
10381048
messages = []
10391049
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
10401050

10411051
with test_utils.run_test_server() as httpd:
1042-
rd, wr = self.loop.run_until_complete(
1043-
asyncio.open_connection(*httpd.address,
1044-
loop=self.loop))
1052+
self.loop.run_until_complete(inner(httpd))
10451053

1046-
f = wr.awrite(b'GET / HTTP/1.0\r\n\r\n')
1047-
self.loop.run_until_complete(f)
1048-
f = rd.readline()
1049-
data = self.loop.run_until_complete(f)
1054+
self.assertEqual(messages, [])
1055+
1056+
def test_async_writer_api(self):
1057+
async def inner(httpd):
1058+
rd, wr = await asyncio.open_connection(*httpd.address)
1059+
1060+
await wr.write(b'GET / HTTP/1.0\r\n\r\n')
1061+
data = await rd.readline()
10501062
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
1051-
f = rd.read()
1052-
data = self.loop.run_until_complete(f)
1063+
data = await rd.read()
10531064
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
1054-
f = wr.aclose()
1055-
self.loop.run_until_complete(f)
1065+
wr.close()
1066+
with self.assertRaises(ConnectionResetError):
1067+
await wr.write(b'data')
1068+
1069+
messages = []
1070+
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
1071+
1072+
with test_utils.run_test_server() as httpd:
1073+
self.loop.run_until_complete(inner(httpd))
10561074

10571075
self.assertEqual(messages, [])
10581076

@@ -1066,7 +1084,7 @@ def test_eof_feed_when_closing_writer(self):
10661084
asyncio.open_connection(*httpd.address,
10671085
loop=self.loop))
10681086

1069-
f = wr.aclose()
1087+
f = wr.close()
10701088
self.loop.run_until_complete(f)
10711089
assert rd.at_eof()
10721090
f = rd.read()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Provide both sync and async calls for StreamWriter.write() and
2+
StreamWriter.close()

0 commit comments

Comments
 (0)