Skip to content

Commit cbbdf2c

Browse files
authored
gh-109709: Fix asyncio test_stdin_broken_pipe() (#109710)
Replace harcoded sleep of 500 ms with synchronization using a pipe. Fix also Process._feed_stdin(): catch also BrokenPipeError on stdin.write(input), not only on stdin.drain().
1 parent 46b63ce commit cbbdf2c

File tree

2 files changed

+50
-16
lines changed

2 files changed

+50
-16
lines changed

Lib/asyncio/subprocess.py

+8-6
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,17 @@ def kill(self):
147147

148148
async def _feed_stdin(self, input):
149149
debug = self._loop.get_debug()
150-
if input is not None:
151-
self.stdin.write(input)
152-
if debug:
153-
logger.debug(
154-
'%r communicate: feed stdin (%s bytes)', self, len(input))
155150
try:
151+
if input is not None:
152+
self.stdin.write(input)
153+
if debug:
154+
logger.debug(
155+
'%r communicate: feed stdin (%s bytes)', self, len(input))
156+
156157
await self.stdin.drain()
157158
except (BrokenPipeError, ConnectionResetError) as exc:
158-
# communicate() ignores BrokenPipeError and ConnectionResetError
159+
# communicate() ignores BrokenPipeError and ConnectionResetError.
160+
# write() and drain() can raise these exceptions.
159161
if debug:
160162
logger.debug('%r communicate: stdin got %r', self, exc)
161163

Lib/test/test_asyncio/test_subprocess.py

+42-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import signal
33
import sys
4+
import textwrap
45
import unittest
56
import warnings
67
from unittest import mock
@@ -12,9 +13,14 @@
1213
from test import support
1314
from test.support import os_helper
1415

15-
if sys.platform != 'win32':
16+
17+
MS_WINDOWS = (sys.platform == 'win32')
18+
if MS_WINDOWS:
19+
import msvcrt
20+
else:
1621
from asyncio import unix_events
1722

23+
1824
if support.check_sanitizer(address=True):
1925
raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI")
2026

@@ -270,26 +276,43 @@ async def send_signal(proc):
270276
finally:
271277
signal.signal(signal.SIGHUP, old_handler)
272278

273-
def prepare_broken_pipe_test(self):
279+
def test_stdin_broken_pipe(self):
274280
# buffer large enough to feed the whole pipe buffer
275281
large_data = b'x' * support.PIPE_MAX_SIZE
276282

283+
rfd, wfd = os.pipe()
284+
self.addCleanup(os.close, rfd)
285+
self.addCleanup(os.close, wfd)
286+
if MS_WINDOWS:
287+
handle = msvcrt.get_osfhandle(rfd)
288+
os.set_handle_inheritable(handle, True)
289+
code = textwrap.dedent(f'''
290+
import os, msvcrt
291+
handle = {handle}
292+
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
293+
os.read(fd, 1)
294+
''')
295+
from subprocess import STARTUPINFO
296+
startupinfo = STARTUPINFO()
297+
startupinfo.lpAttributeList = {"handle_list": [handle]}
298+
kwargs = dict(startupinfo=startupinfo)
299+
else:
300+
code = f'import os; fd = {rfd}; os.read(fd, 1)'
301+
kwargs = dict(pass_fds=(rfd,))
302+
277303
# the program ends before the stdin can be fed
278304
proc = self.loop.run_until_complete(
279305
asyncio.create_subprocess_exec(
280-
sys.executable, '-c', 'pass',
306+
sys.executable, '-c', code,
281307
stdin=subprocess.PIPE,
308+
**kwargs
282309
)
283310
)
284311

285-
return (proc, large_data)
286-
287-
def test_stdin_broken_pipe(self):
288-
proc, large_data = self.prepare_broken_pipe_test()
289-
290312
async def write_stdin(proc, data):
291-
await asyncio.sleep(0.5)
292313
proc.stdin.write(data)
314+
# Only exit the child process once the write buffer is filled
315+
os.write(wfd, b'go')
293316
await proc.stdin.drain()
294317

295318
coro = write_stdin(proc, large_data)
@@ -300,7 +323,16 @@ async def write_stdin(proc, data):
300323
self.loop.run_until_complete(proc.wait())
301324

302325
def test_communicate_ignore_broken_pipe(self):
303-
proc, large_data = self.prepare_broken_pipe_test()
326+
# buffer large enough to feed the whole pipe buffer
327+
large_data = b'x' * support.PIPE_MAX_SIZE
328+
329+
# the program ends before the stdin can be fed
330+
proc = self.loop.run_until_complete(
331+
asyncio.create_subprocess_exec(
332+
sys.executable, '-c', 'pass',
333+
stdin=subprocess.PIPE,
334+
)
335+
)
304336

305337
# communicate() must ignore BrokenPipeError when feeding stdin
306338
self.loop.set_exception_handler(lambda loop, msg: None)

0 commit comments

Comments
 (0)