Skip to content

bpo-36719: Fix regrtest MultiprocessThread #13301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 14, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions Lib/test/libregrtest/runtest_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
# Display the running tests if nothing happened last N seconds
PROGRESS_UPDATE = 30.0 # seconds

# Time to wait until a worker completes: should be immediate
JOIN_TIMEOUT = 30.0 # seconds


def must_stop(result, ns):
if result.result == INTERRUPTED:
Expand Down Expand Up @@ -91,6 +94,10 @@ def stop(self):
MultiprocessResult = collections.namedtuple('MultiprocessResult',
'result stdout stderr error_msg')

class ExitThread(Exception):
pass


class MultiprocessThread(threading.Thread):
def __init__(self, pending, output, ns):
super().__init__()
Expand All @@ -100,13 +107,31 @@ def __init__(self, pending, output, ns):
self.current_test_name = None
self.start_time = None
self._popen = None
self._killed = False

def __repr__(self):
info = ['MultiprocessThread']
test = self.current_test_name
if self.is_alive():
info.append('alive')
if test:
info.append(f'test={test}')
popen = self._popen
if popen:
info.append(f'pid={popen.pid}')
return '<%s>' % ' '.join(info)

def kill(self):
self._killed = True

popen = self._popen
if popen is None:
return
print("Kill regrtest worker process %s" % popen.pid)
popen.kill()
# stdout and stderr must be closed to ensure that communicate()
# does not hang
popen.stdout.close()
popen.stderr.close()

def _runtest(self, test_name):
try:
Expand All @@ -117,7 +142,21 @@ def _runtest(self, test_name):
popen = self._popen
with popen:
try:
stdout, stderr = popen.communicate()
if self._killed:
# If kill() has been called before self._popen is set,
# self._popen is still running. Call again kill()
# to ensure that the process is killed.
self.kill()
raise ExitThread

try:
stdout, stderr = popen.communicate()
except OSError:
if self._killed:
# kill() has been called: communicate() fails
# on reading closed stdout/stderr
raise ExitThread
raise
except:
self.kill()
popen.wait()
Expand Down Expand Up @@ -154,7 +193,7 @@ def _runtest(self, test_name):
return MultiprocessResult(result, stdout, stderr, err_msg)

def run(self):
while True:
while not self._killed:
try:
try:
test_name = next(self.pending)
Expand All @@ -166,6 +205,8 @@ def run(self):

if must_stop(mp_result.result, self.ns):
break
except ExitThread:
break
except BaseException:
self.output.put((True, traceback.format_exc()))
break
Expand Down Expand Up @@ -205,10 +246,20 @@ def start_workers(self):
worker.start()

def wait_workers(self):
start_time = time.monotonic()
for worker in self.workers:
worker.kill()
for worker in self.workers:
worker.join()
while True:
worker.join(1.0)
if not worker.is_alive():
break
dt = time.monotonic() - start_time
print("Wait for regrtest worker %r for %.1f sec" % (worker, dt))
if dt > JOIN_TIMEOUT:
print("Warning -- failed to join a regrtest worker %s"
% worker)
break

def _get_result(self):
if not any(worker.is_alive() for worker in self.workers):
Expand Down