Skip to content

Commit 3b5162d

Browse files
committed
Merge 3.4->default: asyncio: Fix upstream issue 168: StreamReader.read(-1) from pipe may hang if data exceeds buffer limit.
2 parents 05278ee + bf88ffb commit 3b5162d

File tree

2 files changed

+47
-6
lines changed

2 files changed

+47
-6
lines changed

Lib/asyncio/streams.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -419,12 +419,17 @@ def read(self, n=-1):
419419
return b''
420420

421421
if n < 0:
422-
while not self._eof:
423-
self._waiter = self._create_waiter('read')
424-
try:
425-
yield from self._waiter
426-
finally:
427-
self._waiter = None
422+
# This used to just loop creating a new waiter hoping to
423+
# collect everything in self._buffer, but that would
424+
# deadlock if the subprocess sends more than self.limit
425+
# bytes. So just call self.read(self._limit) until EOF.
426+
blocks = []
427+
while True:
428+
block = yield from self.read(self._limit)
429+
if not block:
430+
break
431+
blocks.append(block)
432+
return b''.join(blocks)
428433
else:
429434
if not self._buffer and not self._eof:
430435
self._waiter = self._create_waiter('read')

Lib/test/test_asyncio/test_streams.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
"""Tests for streams.py."""
22

33
import gc
4+
import os
45
import socket
6+
import sys
57
import unittest
68
from unittest import mock
79
try:
@@ -583,6 +585,40 @@ def client(path):
583585
server.stop()
584586
self.assertEqual(msg, b"hello world!\n")
585587

588+
@unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
589+
def test_read_all_from_pipe_reader(self):
590+
# See Tulip issue 168. This test is derived from the example
591+
# subprocess_attach_read_pipe.py, but we configure the
592+
# StreamReader's limit so that twice it is less than the size
593+
# of the data writter. Also we must explicitly attach a child
594+
# watcher to the event loop.
595+
596+
watcher = asyncio.get_child_watcher()
597+
watcher.attach_loop(self.loop)
598+
599+
code = """\
600+
import os, sys
601+
fd = int(sys.argv[1])
602+
os.write(fd, b'data')
603+
os.close(fd)
604+
"""
605+
rfd, wfd = os.pipe()
606+
args = [sys.executable, '-c', code, str(wfd)]
607+
608+
pipe = open(rfd, 'rb', 0)
609+
reader = asyncio.StreamReader(loop=self.loop, limit=1)
610+
protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
611+
transport, _ = self.loop.run_until_complete(
612+
self.loop.connect_read_pipe(lambda: protocol, pipe))
613+
614+
proc = self.loop.run_until_complete(
615+
asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop))
616+
self.loop.run_until_complete(proc.wait())
617+
618+
os.close(wfd)
619+
data = self.loop.run_until_complete(reader.read(-1))
620+
self.assertEqual(data, b'data')
621+
586622

587623
if __name__ == '__main__':
588624
unittest.main()

0 commit comments

Comments
 (0)