Skip to content

[3.11] gh-109709: Fix asyncio test_stdin_broken_pipe() (#109710) #109735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions Lib/asyncio/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,16 @@ def kill(self):

async def _feed_stdin(self, input):
debug = self._loop.get_debug()
self.stdin.write(input)
if debug:
logger.debug(
'%r communicate: feed stdin (%s bytes)', self, len(input))
try:
self.stdin.write(input)
if debug:
logger.debug(
'%r communicate: feed stdin (%s bytes)', self, len(input))

await self.stdin.drain()
except (BrokenPipeError, ConnectionResetError) as exc:
# communicate() ignores BrokenPipeError and ConnectionResetError
# communicate() ignores BrokenPipeError and ConnectionResetError.
# write() and drain() can raise these exceptions.
if debug:
logger.debug('%r communicate: stdin got %r', self, exc)

Expand Down
52 changes: 42 additions & 10 deletions Lib/test/test_asyncio/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import shutil
import signal
import sys
import textwrap
import unittest
import warnings
from unittest import mock
Expand All @@ -13,9 +14,14 @@
from test import support
from test.support import os_helper

if sys.platform != 'win32':

MS_WINDOWS = (sys.platform == 'win32')
if MS_WINDOWS:
import msvcrt
else:
from asyncio import unix_events


if support.check_sanitizer(address=True):
raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI")

Expand Down Expand Up @@ -253,26 +259,43 @@ async def send_signal(proc):
finally:
signal.signal(signal.SIGHUP, old_handler)

def prepare_broken_pipe_test(self):
def test_stdin_broken_pipe(self):
# buffer large enough to feed the whole pipe buffer
large_data = b'x' * support.PIPE_MAX_SIZE

rfd, wfd = os.pipe()
self.addCleanup(os.close, rfd)
self.addCleanup(os.close, wfd)
if MS_WINDOWS:
handle = msvcrt.get_osfhandle(rfd)
os.set_handle_inheritable(handle, True)
code = textwrap.dedent(f'''
import os, msvcrt
handle = {handle}
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
os.read(fd, 1)
''')
from subprocess import STARTUPINFO
startupinfo = STARTUPINFO()
startupinfo.lpAttributeList = {"handle_list": [handle]}
kwargs = dict(startupinfo=startupinfo)
else:
code = f'import os; fd = {rfd}; os.read(fd, 1)'
kwargs = dict(pass_fds=(rfd,))

# the program ends before the stdin can be fed
proc = self.loop.run_until_complete(
asyncio.create_subprocess_exec(
sys.executable, '-c', 'pass',
sys.executable, '-c', code,
stdin=subprocess.PIPE,
**kwargs
)
)

return (proc, large_data)

def test_stdin_broken_pipe(self):
proc, large_data = self.prepare_broken_pipe_test()

async def write_stdin(proc, data):
await asyncio.sleep(0.5)
proc.stdin.write(data)
# Only exit the child process once the write buffer is filled
os.write(wfd, b'go')
await proc.stdin.drain()

coro = write_stdin(proc, large_data)
Expand All @@ -283,7 +306,16 @@ async def write_stdin(proc, data):
self.loop.run_until_complete(proc.wait())

def test_communicate_ignore_broken_pipe(self):
proc, large_data = self.prepare_broken_pipe_test()
# buffer large enough to feed the whole pipe buffer
large_data = b'x' * support.PIPE_MAX_SIZE

# the program ends before the stdin can be fed
proc = self.loop.run_until_complete(
asyncio.create_subprocess_exec(
sys.executable, '-c', 'pass',
stdin=subprocess.PIPE,
)
)

# communicate() must ignore BrokenPipeError when feeding stdin
self.loop.set_exception_handler(lambda loop, msg: None)
Expand Down