2
2
import shutil
3
3
import signal
4
4
import sys
5
+ import textwrap
5
6
import unittest
6
7
import warnings
7
8
from unittest import mock
13
14
from test import support
14
15
from test .support import os_helper
15
16
16
- if sys .platform != 'win32' :
17
+
18
+ MS_WINDOWS = (sys .platform == 'win32' )
19
+ if MS_WINDOWS :
20
+ import msvcrt
21
+ else :
17
22
from asyncio import unix_events
18
23
24
+
19
25
if support .check_sanitizer (address = True ):
20
26
raise unittest .SkipTest ("Exposes ASAN flakiness in GitHub CI" )
21
27
@@ -253,26 +259,43 @@ async def send_signal(proc):
253
259
finally :
254
260
signal .signal (signal .SIGHUP , old_handler )
255
261
256
- def prepare_broken_pipe_test (self ):
262
+ def test_stdin_broken_pipe (self ):
257
263
# buffer large enough to feed the whole pipe buffer
258
264
large_data = b'x' * support .PIPE_MAX_SIZE
259
265
266
+ rfd , wfd = os .pipe ()
267
+ self .addCleanup (os .close , rfd )
268
+ self .addCleanup (os .close , wfd )
269
+ if MS_WINDOWS :
270
+ handle = msvcrt .get_osfhandle (rfd )
271
+ os .set_handle_inheritable (handle , True )
272
+ code = textwrap .dedent (f'''
273
+ import os, msvcrt
274
+ handle = { handle }
275
+ fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
276
+ os.read(fd, 1)
277
+ ''' )
278
+ from subprocess import STARTUPINFO
279
+ startupinfo = STARTUPINFO ()
280
+ startupinfo .lpAttributeList = {"handle_list" : [handle ]}
281
+ kwargs = dict (startupinfo = startupinfo )
282
+ else :
283
+ code = f'import os; fd = { rfd } ; os.read(fd, 1)'
284
+ kwargs = dict (pass_fds = (rfd ,))
285
+
260
286
# the program ends before the stdin can be fed
261
287
proc = self .loop .run_until_complete (
262
288
asyncio .create_subprocess_exec (
263
- sys .executable , '-c' , 'pass' ,
289
+ sys .executable , '-c' , code ,
264
290
stdin = subprocess .PIPE ,
291
+ ** kwargs
265
292
)
266
293
)
267
294
268
- return (proc , large_data )
269
-
270
- def test_stdin_broken_pipe (self ):
271
- proc , large_data = self .prepare_broken_pipe_test ()
272
-
273
295
async def write_stdin (proc , data ):
274
- await asyncio .sleep (0.5 )
275
296
proc .stdin .write (data )
297
+ # Only exit the child process once the write buffer is filled
298
+ os .write (wfd , b'go' )
276
299
await proc .stdin .drain ()
277
300
278
301
coro = write_stdin (proc , large_data )
@@ -283,7 +306,16 @@ async def write_stdin(proc, data):
283
306
self .loop .run_until_complete (proc .wait ())
284
307
285
308
def test_communicate_ignore_broken_pipe (self ):
286
- proc , large_data = self .prepare_broken_pipe_test ()
309
+ # buffer large enough to feed the whole pipe buffer
310
+ large_data = b'x' * support .PIPE_MAX_SIZE
311
+
312
+ # the program ends before the stdin can be fed
313
+ proc = self .loop .run_until_complete (
314
+ asyncio .create_subprocess_exec (
315
+ sys .executable , '-c' , 'pass' ,
316
+ stdin = subprocess .PIPE ,
317
+ )
318
+ )
287
319
288
320
# communicate() must ignore BrokenPipeError when feeding stdin
289
321
self .loop .set_exception_handler (lambda loop , msg : None )
0 commit comments