From d19861fd5fcdae44bf0800f2b6031fcb6f464d76 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Tue, 12 Sep 2023 19:44:33 +0200 Subject: [PATCH] gh-109276: libregrtest: WASM use filename for JSON On Emscripten and WASI platforms, or if --python command line option is used, libregrtest now uses a filename for the JSON file. Emscripten and WASI buildbot workers run the main test process with a different Python (Linux) which spawns Emscripten/WASI processes using the command specified in --python command line option. Passing a file descriptor from the parent process to the child process doesn't work in this case. * Add JsonFile and JsonFileType classes * Add RunTests.json_file_use_filename() method. * Add a test in test_regrtest on the --python command line option. * test_regrtest: add parallel=False parameter. * Split long RunWorkers._runtest() function into sub-functions. --- Lib/test/libregrtest/main.py | 2 +- Lib/test/libregrtest/run_workers.py | 249 +++++++++++++++++----------- Lib/test/libregrtest/runtests.py | 68 +++++++- Lib/test/libregrtest/utils.py | 12 +- Lib/test/libregrtest/worker.py | 43 ++--- Lib/test/test_regrtest.py | 39 ++++- 6 files changed, 273 insertions(+), 140 deletions(-) diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index dd9a10764b075a..ba493ae1796fe0 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -406,7 +406,7 @@ def create_run_tests(self, tests: TestTuple): python_cmd=self.python_cmd, randomize=self.randomize, random_seed=self.random_seed, - json_fd=None, + json_file=None, ) def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int: diff --git a/Lib/test/libregrtest/run_workers.py b/Lib/test/libregrtest/run_workers.py index eaca0af17ea13a..a6bebccd0c7585 100644 --- a/Lib/test/libregrtest/run_workers.py +++ b/Lib/test/libregrtest/run_workers.py @@ -1,3 +1,4 @@ +import contextlib import dataclasses import faulthandler import os.path @@ -9,7 +10,7 @@ import threading import time import traceback -from typing import Literal +from typing import Literal, TextIO from test import support from test.support import os_helper @@ -17,10 +18,10 @@ from .logger import Logger from .result import TestResult, State from .results import TestResults -from .runtests import RunTests +from .runtests import RunTests, JsonFile, JsonFileType from .single import PROGRESS_MIN_TIME from .utils import ( - StrPath, StrJSON, TestName, MS_WINDOWS, + StrPath, StrJSON, TestName, MS_WINDOWS, TMP_PREFIX, format_duration, print_warning, count, plural) from .worker import create_worker_process, USE_PROCESS_GROUP @@ -83,6 +84,17 @@ class ExitThread(Exception): pass +class WorkerError(Exception): + def __init__(self, + test_name: TestName, + err_msg: str | None, + stdout: str | None, + state: str = State.MULTIPROCESSING_ERROR): + result = TestResult(test_name, state=state) + self.mp_result = MultiprocessResult(result, stdout, err_msg) + super().__init__() + + class WorkerThread(threading.Thread): def __init__(self, worker_id: int, runner: "RunWorkers") -> None: super().__init__() @@ -92,7 +104,7 @@ def __init__(self, worker_id: int, runner: "RunWorkers") -> None: self.output = runner.output self.timeout = runner.worker_timeout self.log = runner.log - self.current_test_name = None + self.test_name = None self.start_time = None self._popen = None self._killed = False @@ -104,7 +116,7 @@ def __repr__(self) -> str: info.append("running") else: info.append('stopped') - test = self.current_test_name + test = self.test_name if test: info.append(f'test={test}') popen = self._popen @@ -147,25 +159,11 @@ def stop(self) -> None: self._stopped = True self._kill() - def mp_result_error( - self, - test_result: TestResult, - stdout: str | None = None, - err_msg=None - ) -> MultiprocessResult: - return MultiprocessResult(test_result, stdout, err_msg) - - def _run_process(self, runtests: RunTests, output_fd: int, json_fd: int, + def _run_process(self, runtests: RunTests, output_fd: int, tmp_dir: StrPath | None = None) -> int: - try: - popen = create_worker_process(runtests, output_fd, json_fd, - tmp_dir) - - self._killed = False - self._popen = popen - except: - self.current_test_name = None - raise + popen = create_worker_process(runtests, output_fd, tmp_dir) + self._popen = popen + self._killed = False try: if self._stopped: @@ -206,10 +204,9 @@ def _run_process(self, runtests: RunTests, output_fd: int, json_fd: int, finally: self._wait_completed() self._popen = None - self.current_test_name = None - def _runtest(self, test_name: TestName) -> MultiprocessResult: - self.current_test_name = test_name + def create_stdout(self, stack: contextlib.ExitStack) -> TextIO: + """Create stdout temporay file (file descriptor).""" if MS_WINDOWS: # gh-95027: When stdout is not a TTY, Python uses the ANSI code @@ -219,85 +216,135 @@ def _runtest(self, test_name: TestName) -> MultiprocessResult: else: encoding = sys.stdout.encoding + # gh-94026: Write stdout+stderr to a tempfile as workaround for + # non-blocking pipes on Emscripten with NodeJS. + stdout_file = tempfile.TemporaryFile('w+', encoding=encoding) + stack.enter_context(stdout_file) + return stdout_file + + def create_json_file(self, stack: contextlib.ExitStack) -> tuple[JsonFile, TextIO | None]: + """Create JSON file.""" + + json_file_use_filename = self.runtests.json_file_use_filename() + if json_file_use_filename: + # create an empty file to make the creation atomic + # (to prevent races with other worker threads) + prefix = TMP_PREFIX + 'json_' + json_fd, json_filename = tempfile.mkstemp(prefix=prefix) + os.close(json_fd) + + stack.callback(os_helper.unlink, json_filename) + json_file = JsonFile(json_filename, JsonFileType.FILENAME) + json_tmpfile = None + else: + json_tmpfile = tempfile.TemporaryFile('w+', encoding='utf8') + stack.enter_context(json_tmpfile) + + json_fd = json_tmpfile.fileno() + if MS_WINDOWS: + json_handle = msvcrt.get_osfhandle(json_fd) + json_file = JsonFile(json_handle, + JsonFileType.WINDOWS_HANDLE) + else: + json_file = JsonFile(json_fd, JsonFileType.UNIX_FD) + return (json_file, json_tmpfile) + + def create_worker_runtests(self, test_name: TestName, json_file: JsonFile) -> RunTests: + """Create the worker RunTests.""" + tests = (test_name,) if self.runtests.rerun: match_tests = self.runtests.get_match_tests(test_name) else: match_tests = None - err_msg = None - # gh-94026: Write stdout+stderr to a tempfile as workaround for - # non-blocking pipes on Emscripten with NodeJS. - with (tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file, - tempfile.TemporaryFile('w+', encoding='utf8') as json_file): - stdout_fd = stdout_file.fileno() - json_fd = json_file.fileno() - if MS_WINDOWS: - json_fd = msvcrt.get_osfhandle(json_fd) - - kwargs = {} - if match_tests: - kwargs['match_tests'] = match_tests - worker_runtests = self.runtests.copy( - tests=tests, - json_fd=json_fd, - **kwargs) - - # gh-93353: Check for leaked temporary files in the parent process, - # since the deletion of temporary files can happen late during - # Python finalization: too late for libregrtest. - if not support.is_wasi: - # Don't check for leaked temporary files and directories if Python is - # run on WASI. WASI don't pass environment variables like TMPDIR to - # worker processes. - tmp_dir = tempfile.mkdtemp(prefix="test_python_") - tmp_dir = os.path.abspath(tmp_dir) - try: - retcode = self._run_process(worker_runtests, - stdout_fd, json_fd, tmp_dir) - finally: - tmp_files = os.listdir(tmp_dir) - os_helper.rmtree(tmp_dir) - else: + kwargs = {} + if match_tests: + kwargs['match_tests'] = match_tests + return self.runtests.copy( + tests=tests, + json_file=json_file, + **kwargs) + + def run_tmp_files(self, worker_runtests: RunTests, + stdout_fd: int) -> (int, list[StrPath]): + # gh-93353: Check for leaked temporary files in the parent process, + # since the deletion of temporary files can happen late during + # Python finalization: too late for libregrtest. + if not support.is_wasi: + # Don't check for leaked temporary files and directories if Python is + # run on WASI. WASI don't pass environment variables like TMPDIR to + # worker processes. + tmp_dir = tempfile.mkdtemp(prefix="test_python_") + tmp_dir = os.path.abspath(tmp_dir) + try: retcode = self._run_process(worker_runtests, - stdout_fd, json_fd) - tmp_files = () - stdout_file.seek(0) + stdout_fd, tmp_dir) + finally: + tmp_files = os.listdir(tmp_dir) + os_helper.rmtree(tmp_dir) + else: + retcode = self._run_process(worker_runtests, stdout_fd) + tmp_files = [] - try: - stdout = stdout_file.read().strip() - except Exception as exc: - # gh-101634: Catch UnicodeDecodeError if stdout cannot be - # decoded from encoding - err_msg = f"Cannot read process stdout: {exc}" - result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR) - return self.mp_result_error(result, err_msg=err_msg) + return (retcode, tmp_files) - try: - # deserialize run_tests_worker() output - json_file.seek(0) - worker_json: StrJSON = json_file.read() - if worker_json: - result = TestResult.from_json(worker_json) - else: - err_msg = "empty JSON" - except Exception as exc: - # gh-101634: Catch UnicodeDecodeError if stdout cannot be - # decoded from encoding - err_msg = f"Fail to read or parser worker process JSON: {exc}" - result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR) - return self.mp_result_error(result, stdout, err_msg=err_msg) - - if retcode is None: - result = TestResult(test_name, state=State.TIMEOUT) - return self.mp_result_error(result, stdout) + def read_stdout(self, stdout_file: TextIO) -> str: + stdout_file.seek(0) + try: + return stdout_file.read().strip() + except Exception as exc: + # gh-101634: Catch UnicodeDecodeError if stdout cannot be + # decoded from encoding + raise WorkerError(self.test_name, + f"Cannot read process stdout: {exc}", None) + + def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None, + stdout: str) -> TestResult: + try: + if json_tmpfile is not None: + json_tmpfile.seek(0) + worker_json: StrJSON = json_tmpfile.read() + else: + with json_file.open(encoding='utf8') as json_fp: + worker_json: StrJSON = json_fp.read() + except Exception as exc: + # gh-101634: Catch UnicodeDecodeError if stdout cannot be + # decoded from encoding + err_msg = f"Failed to read worker process JSON: {exc}" + raise WorkerError(self.test_name, err_msg, stdout, + state=State.MULTIPROCESSING_ERROR) + + if not worker_json: + raise WorkerError(self.test_name, "empty JSON", stdout) - if retcode != 0: - err_msg = "Exit code %s" % retcode + try: + return TestResult.from_json(worker_json) + except Exception as exc: + # gh-101634: Catch UnicodeDecodeError if stdout cannot be + # decoded from encoding + err_msg = f"Failed to parse worker process JSON: {exc}" + raise WorkerError(self.test_name, err_msg, stdout, + state=State.MULTIPROCESSING_ERROR) + + def _runtest(self, test_name: TestName) -> MultiprocessResult: + with contextlib.ExitStack() as stack: + stdout_file = self.create_stdout(stack) + json_file, json_tmpfile = self.create_json_file(stack) + worker_runtests = self.create_worker_runtests(test_name, json_file) + + retcode, tmp_files = self.run_tmp_files(worker_runtests, + stdout_file.fileno()) + + stdout = self.read_stdout(stdout_file) - if err_msg: - result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR) - return self.mp_result_error(result, stdout, err_msg) + if retcode is None: + raise WorkerError(self.test_name, None, stdout, state=State.TIMEOUT) + + result = self.read_json(json_file, json_tmpfile, stdout) + + if retcode != 0: + raise WorkerError(self.test_name, f"Exit code {retcode}", stdout) if tmp_files: msg = (f'\n\n' @@ -319,7 +366,13 @@ def run(self) -> None: break self.start_time = time.monotonic() - mp_result = self._runtest(test_name) + self.test_name = test_name + try: + mp_result = self._runtest(test_name) + except WorkerError as exc: + mp_result = exc.mp_result + finally: + self.test_name = None mp_result.result.duration = time.monotonic() - self.start_time self.output.put((False, mp_result)) @@ -367,12 +420,12 @@ def wait_stopped(self, start_time: float) -> None: def get_running(workers: list[WorkerThread]) -> list[str]: running = [] for worker in workers: - current_test_name = worker.current_test_name - if not current_test_name: + test_name = worker.test_name + if not test_name: continue dt = time.monotonic() - worker.start_time if dt >= PROGRESS_MIN_TIME: - text = '%s (%s)' % (current_test_name, format_duration(dt)) + text = f'{test_name} ({format_duration(dt)})' running.append(text) if not running: return None diff --git a/Lib/test/libregrtest/runtests.py b/Lib/test/libregrtest/runtests.py index 5c68df126e2a8f..62a0a7e20c7b8b 100644 --- a/Lib/test/libregrtest/runtests.py +++ b/Lib/test/libregrtest/runtests.py @@ -1,11 +1,62 @@ +import contextlib import dataclasses import json +import os +import subprocess from typing import Any +from test import support + from .utils import ( StrPath, StrJSON, TestTuple, FilterTuple, FilterDict) +class JsonFileType: + UNIX_FD = "UNIX_FD" + WINDOWS_HANDLE = "WINDOWS_HANDLE" + FILENAME = "FILENAME" + + +@dataclasses.dataclass(slots=True, frozen=True) +class JsonFile: + # See RunTests.json_file_use_filename() + file: int | StrPath + file_type: str + + def configure_subprocess(self, popen_kwargs: dict) -> None: + match self.file_type: + case JsonFileType.UNIX_FD: + # Unix file descriptor + popen_kwargs['pass_fds'] = [self.file] + case JsonFileType.WINDOWS_HANDLE: + # Windows handle + startupinfo = subprocess.STARTUPINFO() + startupinfo.lpAttributeList = {"handle_list": [self.file]} + popen_kwargs['startupinfo'] = startupinfo + case JsonFileType.FILENAME: + # Filename: nothing to do to + pass + + @contextlib.contextmanager + def inherit_subprocess(self): + if self.file_type == JsonFileType.WINDOWS_HANDLE: + os.set_handle_inheritable(self.file, True) + try: + yield + finally: + os.set_handle_inheritable(self.file, False) + else: + yield + + def open(self, mode='r', *, encoding): + file = self.file + if self.file_type == JsonFileType.WINDOWS_HANDLE: + import msvcrt + # Create a file descriptor from the handle + file = msvcrt.open_osfhandle(file, os.O_WRONLY) + return open(file, mode, encoding=encoding) + + @dataclasses.dataclass(slots=True, frozen=True) class HuntRefleak: warmups: int @@ -38,9 +89,7 @@ class RunTests: python_cmd: tuple[str] | None randomize: bool random_seed: int | None - # On Unix, it's a file descriptor. - # On Windows, it's a handle. - json_fd: int | None + json_file: JsonFile | None def copy(self, **override): state = dataclasses.asdict(self) @@ -74,6 +123,17 @@ def as_json(self) -> StrJSON: def from_json(worker_json: StrJSON) -> 'RunTests': return json.loads(worker_json, object_hook=_decode_runtests) + def json_file_use_filename(self) -> bool: + # json_file type depends on the platform: + # - Unix: file descriptor (int) + # - Windows: handle (int) + # - Emscripten/WASI or if --python is used: filename (str) + return ( + bool(self.python_cmd) + or support.is_emscripten + or support.is_wasi + ) + class _EncodeRunTests(json.JSONEncoder): def default(self, o: Any) -> dict[str, Any]: @@ -90,6 +150,8 @@ def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]: data.pop('__runtests__') if data['hunt_refleak']: data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak']) + if data['json_file']: + data['json_file'] = JsonFile(**data['json_file']) return RunTests(**data) else: return data diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py index 03c27b9fe17053..ce7342aabfffbe 100644 --- a/Lib/test/libregrtest/utils.py +++ b/Lib/test/libregrtest/utils.py @@ -17,8 +17,12 @@ MS_WINDOWS = (sys.platform == 'win32') -WORK_DIR_PREFIX = 'test_python_' -WORKER_WORK_DIR_PREFIX = f'{WORK_DIR_PREFIX}worker_' + +# All temporary files and temporary directories created by libregrtest should +# use TMP_PREFIX so cleanup_temp_dir() can remove them all. +TMP_PREFIX = 'test_python_' +WORK_DIR_PREFIX = TMP_PREFIX +WORKER_WORK_DIR_PREFIX = WORK_DIR_PREFIX + 'worker_' # bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()). # Used to protect against threading._shutdown() hang. @@ -387,7 +391,7 @@ def get_work_dir(parent_dir: StrPath, worker: bool = False) -> StrPath: # testing (see the -j option). # Emscripten and WASI have stubbed getpid(), Emscripten has only # milisecond clock resolution. Use randint() instead. - if sys.platform in {"emscripten", "wasi"}: + if support.is_emscripten or support.is_wasi: nounce = random.randint(0, 1_000_000) else: nounce = os.getpid() @@ -580,7 +584,7 @@ def display_header(): def cleanup_temp_dir(tmp_dir: StrPath): import glob - path = os.path.join(glob.escape(tmp_dir), WORK_DIR_PREFIX + '*') + path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*') print("Cleanup %s directory" % tmp_dir) for name in glob.glob(path): if os.path.isdir(name): diff --git a/Lib/test/libregrtest/worker.py b/Lib/test/libregrtest/worker.py index 0963faa2e4d2a1..ae3dcb2b06ed36 100644 --- a/Lib/test/libregrtest/worker.py +++ b/Lib/test/libregrtest/worker.py @@ -7,18 +7,17 @@ from test.support import os_helper from .setup import setup_process, setup_test_dir -from .runtests import RunTests +from .runtests import RunTests, JsonFile from .single import run_single_test from .utils import ( - StrPath, StrJSON, FilterTuple, MS_WINDOWS, + StrPath, StrJSON, FilterTuple, get_temp_dir, get_work_dir, exit_timeout) USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg")) -def create_worker_process(runtests: RunTests, - output_fd: int, json_fd: int, +def create_worker_process(runtests: RunTests, output_fd: int, tmp_dir: StrPath | None = None) -> subprocess.Popen: python_cmd = runtests.python_cmd worker_json = runtests.as_json() @@ -55,34 +54,24 @@ def create_worker_process(runtests: RunTests, close_fds=True, cwd=work_dir, ) - if not MS_WINDOWS: - kwargs['pass_fds'] = [json_fd] - else: - startupinfo = subprocess.STARTUPINFO() - startupinfo.lpAttributeList = {"handle_list": [json_fd]} - kwargs['startupinfo'] = startupinfo - if USE_PROCESS_GROUP: - kwargs['start_new_session'] = True - - if MS_WINDOWS: - os.set_handle_inheritable(json_fd, True) - try: + + # Pass json_file to the worker process + json_file = runtests.json_file + json_file.configure_subprocess(kwargs) + + with json_file.inherit_subprocess(): return subprocess.Popen(cmd, **kwargs) - finally: - if MS_WINDOWS: - os.set_handle_inheritable(json_fd, False) def worker_process(worker_json: StrJSON) -> NoReturn: runtests = RunTests.from_json(worker_json) test_name = runtests.tests[0] match_tests: FilterTuple | None = runtests.match_tests - json_fd: int = runtests.json_fd - - if MS_WINDOWS: - import msvcrt - json_fd = msvcrt.open_osfhandle(json_fd, os.O_WRONLY) - + # json_file type depends on the platform: + # - Unix: file descriptor (int) + # - Windows: handle (int) + # - Emscripten/WASI or if --python is used: filename (str) + json_file: JsonFile = runtests.json_file setup_test_dir(runtests.test_dir) setup_process() @@ -96,8 +85,8 @@ def worker_process(worker_json: StrJSON) -> NoReturn: result = run_single_test(test_name, runtests) - with open(json_fd, 'w', encoding='utf-8') as json_file: - result.write_json_into(json_file) + with json_file.open('w', encoding='utf-8') as json_fp: + result.write_json_into(json_fp) sys.exit(0) diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py index 7cf3d05a6e6d70..55cf9e7f020721 100644 --- a/Lib/test/test_regrtest.py +++ b/Lib/test/test_regrtest.py @@ -13,6 +13,7 @@ import platform import random import re +import shlex import subprocess import sys import sysconfig @@ -432,13 +433,14 @@ def parse_executed_tests(self, output): parser = re.finditer(regex, output, re.MULTILINE) return list(match.group(1) for match in parser) - def check_executed_tests(self, output, tests, skipped=(), failed=(), + def check_executed_tests(self, output, tests, *, stats, + skipped=(), failed=(), env_changed=(), omitted=(), rerun=None, run_no_tests=(), resource_denied=(), - randomize=False, interrupted=False, + randomize=False, parallel=False, interrupted=False, fail_env_changed=False, - *, stats, forever=False, filtered=False): + forever=False, filtered=False): if isinstance(tests, str): tests = [tests] if isinstance(skipped, str): @@ -455,6 +457,8 @@ def check_executed_tests(self, output, tests, skipped=(), failed=(), run_no_tests = [run_no_tests] if isinstance(stats, int): stats = TestStats(stats) + if parallel: + randomize = True rerun_failed = [] if rerun is not None: @@ -1120,7 +1124,7 @@ def test_crashed(self): tests = [crash_test] output = self.run_tests("-j2", *tests, exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, tests, failed=crash_test, - randomize=True, stats=0) + parallel=True, stats=0) def parse_methods(self, output): regex = re.compile("^(test[^ ]+).*ok$", flags=re.MULTILINE) @@ -1744,7 +1748,7 @@ def test_leak_tmp_file(self): self.check_executed_tests(output, testnames, env_changed=testnames, fail_env_changed=True, - randomize=True, + parallel=True, stats=len(testnames)) for testname in testnames: self.assertIn(f"Warning -- {testname} leaked temporary " @@ -1784,7 +1788,7 @@ def test_mp_decode_error(self): exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, [testname], failed=[testname], - randomize=True, + parallel=True, stats=0) def test_doctest(self): @@ -1823,7 +1827,7 @@ def load_tests(loader, tests, pattern): exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, [testname], failed=[testname], - randomize=True, + parallel=True, stats=TestStats(1, 1, 0)) def _check_random_seed(self, run_workers: bool): @@ -1866,6 +1870,27 @@ def test_random_seed(self): def test_random_seed_workers(self): self._check_random_seed(run_workers=True) + def test_python_command(self): + code = textwrap.dedent(r""" + import sys + import unittest + + class WorkerTests(unittest.TestCase): + def test_dev_mode(self): + self.assertTrue(sys.flags.dev_mode) + """) + tests = [self.create_test(code=code) for _ in range(3)] + + # Custom Python command: "python -X dev" + python_cmd = [sys.executable, '-X', 'dev'] + # test.libregrtest.cmdline uses shlex.split() to parse the Python + # command line string + python_cmd = shlex.join(python_cmd) + + output = self.run_tests("--python", python_cmd, "-j0", *tests) + self.check_executed_tests(output, tests, + stats=len(tests), parallel=True) + class TestUtils(unittest.TestCase): def test_format_duration(self):