1
+ import contextlib
1
2
import dataclasses
2
3
import faulthandler
3
4
import os .path
9
10
import threading
10
11
import time
11
12
import traceback
12
- from typing import Literal
13
+ from typing import Literal , TextIO
13
14
14
15
from test import support
15
16
from test .support import os_helper
16
17
17
18
from .logger import Logger
18
19
from .result import TestResult , State
19
20
from .results import TestResults
20
- from .runtests import RunTests
21
+ from .runtests import RunTests , JsonFile , JsonFileType
21
22
from .single import PROGRESS_MIN_TIME
22
23
from .utils import (
23
- StrPath , StrJSON , TestName , MS_WINDOWS ,
24
+ StrPath , StrJSON , TestName , MS_WINDOWS , TMP_PREFIX ,
24
25
format_duration , print_warning , count , plural )
25
26
from .worker import create_worker_process , USE_PROCESS_GROUP
26
27
@@ -83,6 +84,17 @@ class ExitThread(Exception):
83
84
pass
84
85
85
86
87
+ class WorkerError (Exception ):
88
+ def __init__ (self ,
89
+ test_name : TestName ,
90
+ err_msg : str | None ,
91
+ stdout : str | None ,
92
+ state : str = State .MULTIPROCESSING_ERROR ):
93
+ result = TestResult (test_name , state = state )
94
+ self .mp_result = MultiprocessResult (result , stdout , err_msg )
95
+ super ().__init__ ()
96
+
97
+
86
98
class WorkerThread (threading .Thread ):
87
99
def __init__ (self , worker_id : int , runner : "RunWorkers" ) -> None :
88
100
super ().__init__ ()
@@ -92,7 +104,7 @@ def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
92
104
self .output = runner .output
93
105
self .timeout = runner .worker_timeout
94
106
self .log = runner .log
95
- self .current_test_name = None
107
+ self .test_name = None
96
108
self .start_time = None
97
109
self ._popen = None
98
110
self ._killed = False
@@ -104,7 +116,7 @@ def __repr__(self) -> str:
104
116
info .append ("running" )
105
117
else :
106
118
info .append ('stopped' )
107
- test = self .current_test_name
119
+ test = self .test_name
108
120
if test :
109
121
info .append (f'test={ test } ' )
110
122
popen = self ._popen
@@ -147,25 +159,11 @@ def stop(self) -> None:
147
159
self ._stopped = True
148
160
self ._kill ()
149
161
150
- def mp_result_error (
151
- self ,
152
- test_result : TestResult ,
153
- stdout : str | None = None ,
154
- err_msg = None
155
- ) -> MultiprocessResult :
156
- return MultiprocessResult (test_result , stdout , err_msg )
157
-
158
- def _run_process (self , runtests : RunTests , output_fd : int , json_fd : int ,
162
+ def _run_process (self , runtests : RunTests , output_fd : int ,
159
163
tmp_dir : StrPath | None = None ) -> int :
160
- try :
161
- popen = create_worker_process (runtests , output_fd , json_fd ,
162
- tmp_dir )
163
-
164
- self ._killed = False
165
- self ._popen = popen
166
- except :
167
- self .current_test_name = None
168
- raise
164
+ popen = create_worker_process (runtests , output_fd , tmp_dir )
165
+ self ._popen = popen
166
+ self ._killed = False
169
167
170
168
try :
171
169
if self ._stopped :
@@ -206,10 +204,9 @@ def _run_process(self, runtests: RunTests, output_fd: int, json_fd: int,
206
204
finally :
207
205
self ._wait_completed ()
208
206
self ._popen = None
209
- self .current_test_name = None
210
207
211
- def _runtest (self , test_name : TestName ) -> MultiprocessResult :
212
- self . current_test_name = test_name
208
+ def create_stdout (self , stack : contextlib . ExitStack ) -> TextIO :
209
+ """Create stdout temporay file (file descriptor)."""
213
210
214
211
if MS_WINDOWS :
215
212
# gh-95027: When stdout is not a TTY, Python uses the ANSI code
@@ -219,85 +216,135 @@ def _runtest(self, test_name: TestName) -> MultiprocessResult:
219
216
else :
220
217
encoding = sys .stdout .encoding
221
218
219
+ # gh-94026: Write stdout+stderr to a tempfile as workaround for
220
+ # non-blocking pipes on Emscripten with NodeJS.
221
+ stdout_file = tempfile .TemporaryFile ('w+' , encoding = encoding )
222
+ stack .enter_context (stdout_file )
223
+ return stdout_file
224
+
225
+ def create_json_file (self , stack : contextlib .ExitStack ) -> tuple [JsonFile , TextIO | None ]:
226
+ """Create JSON file."""
227
+
228
+ json_file_use_filename = self .runtests .json_file_use_filename ()
229
+ if json_file_use_filename :
230
+ # create an empty file to make the creation atomic
231
+ # (to prevent races with other worker threads)
232
+ prefix = TMP_PREFIX + 'json_'
233
+ json_fd , json_filename = tempfile .mkstemp (prefix = prefix )
234
+ os .close (json_fd )
235
+
236
+ stack .callback (os_helper .unlink , json_filename )
237
+ json_file = JsonFile (json_filename , JsonFileType .FILENAME )
238
+ json_tmpfile = None
239
+ else :
240
+ json_tmpfile = tempfile .TemporaryFile ('w+' , encoding = 'utf8' )
241
+ stack .enter_context (json_tmpfile )
242
+
243
+ json_fd = json_tmpfile .fileno ()
244
+ if MS_WINDOWS :
245
+ json_handle = msvcrt .get_osfhandle (json_fd )
246
+ json_file = JsonFile (json_handle ,
247
+ JsonFileType .WINDOWS_HANDLE )
248
+ else :
249
+ json_file = JsonFile (json_fd , JsonFileType .UNIX_FD )
250
+ return (json_file , json_tmpfile )
251
+
252
+ def create_worker_runtests (self , test_name : TestName , json_file : JsonFile ) -> RunTests :
253
+ """Create the worker RunTests."""
254
+
222
255
tests = (test_name ,)
223
256
if self .runtests .rerun :
224
257
match_tests = self .runtests .get_match_tests (test_name )
225
258
else :
226
259
match_tests = None
227
- err_msg = None
228
260
229
- # gh-94026: Write stdout+stderr to a tempfile as workaround for
230
- # non-blocking pipes on Emscripten with NodeJS.
231
- with (tempfile .TemporaryFile ('w+' , encoding = encoding ) as stdout_file ,
232
- tempfile .TemporaryFile ('w+' , encoding = 'utf8' ) as json_file ):
233
- stdout_fd = stdout_file .fileno ()
234
- json_fd = json_file .fileno ()
235
- if MS_WINDOWS :
236
- json_fd = msvcrt .get_osfhandle (json_fd )
237
-
238
- kwargs = {}
239
- if match_tests :
240
- kwargs ['match_tests' ] = match_tests
241
- worker_runtests = self .runtests .copy (
242
- tests = tests ,
243
- json_fd = json_fd ,
244
- ** kwargs )
245
-
246
- # gh-93353: Check for leaked temporary files in the parent process,
247
- # since the deletion of temporary files can happen late during
248
- # Python finalization: too late for libregrtest.
249
- if not support .is_wasi :
250
- # Don't check for leaked temporary files and directories if Python is
251
- # run on WASI. WASI don't pass environment variables like TMPDIR to
252
- # worker processes.
253
- tmp_dir = tempfile .mkdtemp (prefix = "test_python_" )
254
- tmp_dir = os .path .abspath (tmp_dir )
255
- try :
256
- retcode = self ._run_process (worker_runtests ,
257
- stdout_fd , json_fd , tmp_dir )
258
- finally :
259
- tmp_files = os .listdir (tmp_dir )
260
- os_helper .rmtree (tmp_dir )
261
- else :
261
+ kwargs = {}
262
+ if match_tests :
263
+ kwargs ['match_tests' ] = match_tests
264
+ return self .runtests .copy (
265
+ tests = tests ,
266
+ json_file = json_file ,
267
+ ** kwargs )
268
+
269
+ def run_tmp_files (self , worker_runtests : RunTests ,
270
+ stdout_fd : int ) -> (int , list [StrPath ]):
271
+ # gh-93353: Check for leaked temporary files in the parent process,
272
+ # since the deletion of temporary files can happen late during
273
+ # Python finalization: too late for libregrtest.
274
+ if not support .is_wasi :
275
+ # Don't check for leaked temporary files and directories if Python is
276
+ # run on WASI. WASI don't pass environment variables like TMPDIR to
277
+ # worker processes.
278
+ tmp_dir = tempfile .mkdtemp (prefix = "test_python_" )
279
+ tmp_dir = os .path .abspath (tmp_dir )
280
+ try :
262
281
retcode = self ._run_process (worker_runtests ,
263
- stdout_fd , json_fd )
264
- tmp_files = ()
265
- stdout_file .seek (0 )
282
+ stdout_fd , tmp_dir )
283
+ finally :
284
+ tmp_files = os .listdir (tmp_dir )
285
+ os_helper .rmtree (tmp_dir )
286
+ else :
287
+ retcode = self ._run_process (worker_runtests , stdout_fd )
288
+ tmp_files = []
266
289
267
- try :
268
- stdout = stdout_file .read ().strip ()
269
- except Exception as exc :
270
- # gh-101634: Catch UnicodeDecodeError if stdout cannot be
271
- # decoded from encoding
272
- err_msg = f"Cannot read process stdout: { exc } "
273
- result = TestResult (test_name , state = State .MULTIPROCESSING_ERROR )
274
- return self .mp_result_error (result , err_msg = err_msg )
290
+ return (retcode , tmp_files )
275
291
276
- try :
277
- # deserialize run_tests_worker() output
278
- json_file .seek (0 )
279
- worker_json : StrJSON = json_file .read ()
280
- if worker_json :
281
- result = TestResult .from_json (worker_json )
282
- else :
283
- err_msg = "empty JSON"
284
- except Exception as exc :
285
- # gh-101634: Catch UnicodeDecodeError if stdout cannot be
286
- # decoded from encoding
287
- err_msg = f"Fail to read or parser worker process JSON: { exc } "
288
- result = TestResult (test_name , state = State .MULTIPROCESSING_ERROR )
289
- return self .mp_result_error (result , stdout , err_msg = err_msg )
290
-
291
- if retcode is None :
292
- result = TestResult (test_name , state = State .TIMEOUT )
293
- return self .mp_result_error (result , stdout )
292
+ def read_stdout (self , stdout_file : TextIO ) -> str :
293
+ stdout_file .seek (0 )
294
+ try :
295
+ return stdout_file .read ().strip ()
296
+ except Exception as exc :
297
+ # gh-101634: Catch UnicodeDecodeError if stdout cannot be
298
+ # decoded from encoding
299
+ raise WorkerError (self .test_name ,
300
+ f"Cannot read process stdout: { exc } " , None )
301
+
302
+ def read_json (self , json_file : JsonFile , json_tmpfile : TextIO | None ,
303
+ stdout : str ) -> TestResult :
304
+ try :
305
+ if json_tmpfile is not None :
306
+ json_tmpfile .seek (0 )
307
+ worker_json : StrJSON = json_tmpfile .read ()
308
+ else :
309
+ with json_file .open (encoding = 'utf8' ) as json_fp :
310
+ worker_json : StrJSON = json_fp .read ()
311
+ except Exception as exc :
312
+ # gh-101634: Catch UnicodeDecodeError if stdout cannot be
313
+ # decoded from encoding
314
+ err_msg = f"Failed to read worker process JSON: { exc } "
315
+ raise WorkerError (self .test_name , err_msg , stdout ,
316
+ state = State .MULTIPROCESSING_ERROR )
317
+
318
+ if not worker_json :
319
+ raise WorkerError (self .test_name , "empty JSON" , stdout )
294
320
295
- if retcode != 0 :
296
- err_msg = "Exit code %s" % retcode
321
+ try :
322
+ return TestResult .from_json (worker_json )
323
+ except Exception as exc :
324
+ # gh-101634: Catch UnicodeDecodeError if stdout cannot be
325
+ # decoded from encoding
326
+ err_msg = f"Failed to parse worker process JSON: { exc } "
327
+ raise WorkerError (self .test_name , err_msg , stdout ,
328
+ state = State .MULTIPROCESSING_ERROR )
329
+
330
+ def _runtest (self , test_name : TestName ) -> MultiprocessResult :
331
+ with contextlib .ExitStack () as stack :
332
+ stdout_file = self .create_stdout (stack )
333
+ json_file , json_tmpfile = self .create_json_file (stack )
334
+ worker_runtests = self .create_worker_runtests (test_name , json_file )
335
+
336
+ retcode , tmp_files = self .run_tmp_files (worker_runtests ,
337
+ stdout_file .fileno ())
338
+
339
+ stdout = self .read_stdout (stdout_file )
297
340
298
- if err_msg :
299
- result = TestResult (test_name , state = State .MULTIPROCESSING_ERROR )
300
- return self .mp_result_error (result , stdout , err_msg )
341
+ if retcode is None :
342
+ raise WorkerError (self .test_name , None , stdout , state = State .TIMEOUT )
343
+
344
+ result = self .read_json (json_file , json_tmpfile , stdout )
345
+
346
+ if retcode != 0 :
347
+ raise WorkerError (self .test_name , f"Exit code { retcode } " , stdout )
301
348
302
349
if tmp_files :
303
350
msg = (f'\n \n '
@@ -319,7 +366,13 @@ def run(self) -> None:
319
366
break
320
367
321
368
self .start_time = time .monotonic ()
322
- mp_result = self ._runtest (test_name )
369
+ self .test_name = test_name
370
+ try :
371
+ mp_result = self ._runtest (test_name )
372
+ except WorkerError as exc :
373
+ mp_result = exc .mp_result
374
+ finally :
375
+ self .test_name = None
323
376
mp_result .result .duration = time .monotonic () - self .start_time
324
377
self .output .put ((False , mp_result ))
325
378
@@ -367,12 +420,12 @@ def wait_stopped(self, start_time: float) -> None:
367
420
def get_running (workers : list [WorkerThread ]) -> list [str ]:
368
421
running = []
369
422
for worker in workers :
370
- current_test_name = worker .current_test_name
371
- if not current_test_name :
423
+ test_name = worker .test_name
424
+ if not test_name :
372
425
continue
373
426
dt = time .monotonic () - worker .start_time
374
427
if dt >= PROGRESS_MIN_TIME :
375
- text = '%s (%s)' % ( current_test_name , format_duration (dt ))
428
+ text = f' { test_name } ( { format_duration (dt )} )'
376
429
running .append (text )
377
430
if not running :
378
431
return None
0 commit comments