Skip to content

gh-109276: libregrtest: WASM use filename for JSON #109340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def create_run_tests(self, tests: TestTuple):
python_cmd=self.python_cmd,
randomize=self.randomize,
random_seed=self.random_seed,
json_fd=None,
json_file=None,
)

def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
Expand Down
249 changes: 151 additions & 98 deletions Lib/test/libregrtest/run_workers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import dataclasses
import faulthandler
import os.path
Expand All @@ -9,18 +10,18 @@
import threading
import time
import traceback
from typing import Literal
from typing import Literal, TextIO

from test import support
from test.support import os_helper

from .logger import Logger
from .result import TestResult, State
from .results import TestResults
from .runtests import RunTests
from .runtests import RunTests, JsonFile, JsonFileType
from .single import PROGRESS_MIN_TIME
from .utils import (
StrPath, StrJSON, TestName, MS_WINDOWS,
StrPath, StrJSON, TestName, MS_WINDOWS, TMP_PREFIX,
format_duration, print_warning, count, plural)
from .worker import create_worker_process, USE_PROCESS_GROUP

Expand Down Expand Up @@ -83,6 +84,17 @@ class ExitThread(Exception):
pass


class WorkerError(Exception):
def __init__(self,
test_name: TestName,
err_msg: str | None,
stdout: str | None,
state: str = State.MULTIPROCESSING_ERROR):
result = TestResult(test_name, state=state)
self.mp_result = MultiprocessResult(result, stdout, err_msg)
super().__init__()


class WorkerThread(threading.Thread):
def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
super().__init__()
Expand All @@ -92,7 +104,7 @@ def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
self.output = runner.output
self.timeout = runner.worker_timeout
self.log = runner.log
self.current_test_name = None
self.test_name = None
self.start_time = None
self._popen = None
self._killed = False
Expand All @@ -104,7 +116,7 @@ def __repr__(self) -> str:
info.append("running")
else:
info.append('stopped')
test = self.current_test_name
test = self.test_name
if test:
info.append(f'test={test}')
popen = self._popen
Expand Down Expand Up @@ -147,25 +159,11 @@ def stop(self) -> None:
self._stopped = True
self._kill()

def mp_result_error(
self,
test_result: TestResult,
stdout: str | None = None,
err_msg=None
) -> MultiprocessResult:
return MultiprocessResult(test_result, stdout, err_msg)

def _run_process(self, runtests: RunTests, output_fd: int, json_fd: int,
def _run_process(self, runtests: RunTests, output_fd: int,
tmp_dir: StrPath | None = None) -> int:
try:
popen = create_worker_process(runtests, output_fd, json_fd,
tmp_dir)

self._killed = False
self._popen = popen
except:
self.current_test_name = None
raise
popen = create_worker_process(runtests, output_fd, tmp_dir)
self._popen = popen
self._killed = False

try:
if self._stopped:
Expand Down Expand Up @@ -206,10 +204,9 @@ def _run_process(self, runtests: RunTests, output_fd: int, json_fd: int,
finally:
self._wait_completed()
self._popen = None
self.current_test_name = None

def _runtest(self, test_name: TestName) -> MultiprocessResult:
self.current_test_name = test_name
def create_stdout(self, stack: contextlib.ExitStack) -> TextIO:
"""Create stdout temporay file (file descriptor)."""

if MS_WINDOWS:
# gh-95027: When stdout is not a TTY, Python uses the ANSI code
Expand All @@ -219,85 +216,135 @@ def _runtest(self, test_name: TestName) -> MultiprocessResult:
else:
encoding = sys.stdout.encoding

# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
stdout_file = tempfile.TemporaryFile('w+', encoding=encoding)
stack.enter_context(stdout_file)
return stdout_file

def create_json_file(self, stack: contextlib.ExitStack) -> tuple[JsonFile, TextIO | None]:
"""Create JSON file."""

json_file_use_filename = self.runtests.json_file_use_filename()
if json_file_use_filename:
# create an empty file to make the creation atomic
# (to prevent races with other worker threads)
prefix = TMP_PREFIX + 'json_'
json_fd, json_filename = tempfile.mkstemp(prefix=prefix)
os.close(json_fd)

stack.callback(os_helper.unlink, json_filename)
json_file = JsonFile(json_filename, JsonFileType.FILENAME)
json_tmpfile = None
else:
json_tmpfile = tempfile.TemporaryFile('w+', encoding='utf8')
stack.enter_context(json_tmpfile)

json_fd = json_tmpfile.fileno()
if MS_WINDOWS:
json_handle = msvcrt.get_osfhandle(json_fd)
json_file = JsonFile(json_handle,
JsonFileType.WINDOWS_HANDLE)
else:
json_file = JsonFile(json_fd, JsonFileType.UNIX_FD)
return (json_file, json_tmpfile)

def create_worker_runtests(self, test_name: TestName, json_file: JsonFile) -> RunTests:
"""Create the worker RunTests."""

tests = (test_name,)
if self.runtests.rerun:
match_tests = self.runtests.get_match_tests(test_name)
else:
match_tests = None
err_msg = None

# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
with (tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file,
tempfile.TemporaryFile('w+', encoding='utf8') as json_file):
stdout_fd = stdout_file.fileno()
json_fd = json_file.fileno()
if MS_WINDOWS:
json_fd = msvcrt.get_osfhandle(json_fd)

kwargs = {}
if match_tests:
kwargs['match_tests'] = match_tests
worker_runtests = self.runtests.copy(
tests=tests,
json_fd=json_fd,
**kwargs)

# gh-93353: Check for leaked temporary files in the parent process,
# since the deletion of temporary files can happen late during
# Python finalization: too late for libregrtest.
if not support.is_wasi:
# Don't check for leaked temporary files and directories if Python is
# run on WASI. WASI don't pass environment variables like TMPDIR to
# worker processes.
tmp_dir = tempfile.mkdtemp(prefix="test_python_")
tmp_dir = os.path.abspath(tmp_dir)
try:
retcode = self._run_process(worker_runtests,
stdout_fd, json_fd, tmp_dir)
finally:
tmp_files = os.listdir(tmp_dir)
os_helper.rmtree(tmp_dir)
else:
kwargs = {}
if match_tests:
kwargs['match_tests'] = match_tests
return self.runtests.copy(
tests=tests,
json_file=json_file,
**kwargs)

def run_tmp_files(self, worker_runtests: RunTests,
stdout_fd: int) -> (int, list[StrPath]):
# gh-93353: Check for leaked temporary files in the parent process,
# since the deletion of temporary files can happen late during
# Python finalization: too late for libregrtest.
if not support.is_wasi:
# Don't check for leaked temporary files and directories if Python is
# run on WASI. WASI don't pass environment variables like TMPDIR to
# worker processes.
tmp_dir = tempfile.mkdtemp(prefix="test_python_")
tmp_dir = os.path.abspath(tmp_dir)
try:
retcode = self._run_process(worker_runtests,
stdout_fd, json_fd)
tmp_files = ()
stdout_file.seek(0)
stdout_fd, tmp_dir)
finally:
tmp_files = os.listdir(tmp_dir)
os_helper.rmtree(tmp_dir)
else:
retcode = self._run_process(worker_runtests, stdout_fd)
tmp_files = []

try:
stdout = stdout_file.read().strip()
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Cannot read process stdout: {exc}"
result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
return self.mp_result_error(result, err_msg=err_msg)
return (retcode, tmp_files)

try:
# deserialize run_tests_worker() output
json_file.seek(0)
worker_json: StrJSON = json_file.read()
if worker_json:
result = TestResult.from_json(worker_json)
else:
err_msg = "empty JSON"
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Fail to read or parser worker process JSON: {exc}"
result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
return self.mp_result_error(result, stdout, err_msg=err_msg)

if retcode is None:
result = TestResult(test_name, state=State.TIMEOUT)
return self.mp_result_error(result, stdout)
def read_stdout(self, stdout_file: TextIO) -> str:
stdout_file.seek(0)
try:
return stdout_file.read().strip()
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
raise WorkerError(self.test_name,
f"Cannot read process stdout: {exc}", None)

def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
stdout: str) -> TestResult:
try:
if json_tmpfile is not None:
json_tmpfile.seek(0)
worker_json: StrJSON = json_tmpfile.read()
else:
with json_file.open(encoding='utf8') as json_fp:
worker_json: StrJSON = json_fp.read()
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Failed to read worker process JSON: {exc}"
raise WorkerError(self.test_name, err_msg, stdout,
state=State.MULTIPROCESSING_ERROR)

if not worker_json:
raise WorkerError(self.test_name, "empty JSON", stdout)

if retcode != 0:
err_msg = "Exit code %s" % retcode
try:
return TestResult.from_json(worker_json)
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Failed to parse worker process JSON: {exc}"
raise WorkerError(self.test_name, err_msg, stdout,
state=State.MULTIPROCESSING_ERROR)

def _runtest(self, test_name: TestName) -> MultiprocessResult:
with contextlib.ExitStack() as stack:
stdout_file = self.create_stdout(stack)
json_file, json_tmpfile = self.create_json_file(stack)
worker_runtests = self.create_worker_runtests(test_name, json_file)

retcode, tmp_files = self.run_tmp_files(worker_runtests,
stdout_file.fileno())

stdout = self.read_stdout(stdout_file)

if err_msg:
result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
return self.mp_result_error(result, stdout, err_msg)
if retcode is None:
raise WorkerError(self.test_name, None, stdout, state=State.TIMEOUT)

result = self.read_json(json_file, json_tmpfile, stdout)

if retcode != 0:
raise WorkerError(self.test_name, f"Exit code {retcode}", stdout)

if tmp_files:
msg = (f'\n\n'
Expand All @@ -319,7 +366,13 @@ def run(self) -> None:
break

self.start_time = time.monotonic()
mp_result = self._runtest(test_name)
self.test_name = test_name
try:
mp_result = self._runtest(test_name)
except WorkerError as exc:
mp_result = exc.mp_result
finally:
self.test_name = None
mp_result.result.duration = time.monotonic() - self.start_time
self.output.put((False, mp_result))

Expand Down Expand Up @@ -367,12 +420,12 @@ def wait_stopped(self, start_time: float) -> None:
def get_running(workers: list[WorkerThread]) -> list[str]:
running = []
for worker in workers:
current_test_name = worker.current_test_name
if not current_test_name:
test_name = worker.test_name
if not test_name:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
text = '%s (%s)' % (current_test_name, format_duration(dt))
text = f'{test_name} ({format_duration(dt)})'
running.append(text)
if not running:
return None
Expand Down
Loading