21
21
# Display the running tests if nothing happened last N seconds
22
22
PROGRESS_UPDATE = 30.0 # seconds
23
23
24
+ # Time to wait until a worker completes: should be immediate
25
+ JOIN_TIMEOUT = 30.0 # seconds
26
+
24
27
25
28
def must_stop (result , ns ):
26
29
if result .result == INTERRUPTED :
@@ -91,6 +94,10 @@ def stop(self):
91
94
MultiprocessResult = collections .namedtuple ('MultiprocessResult' ,
92
95
'result stdout stderr error_msg' )
93
96
97
+ class ExitThread (Exception ):
98
+ pass
99
+
100
+
94
101
class MultiprocessThread (threading .Thread ):
95
102
def __init__ (self , pending , output , ns ):
96
103
super ().__init__ ()
@@ -100,13 +107,31 @@ def __init__(self, pending, output, ns):
100
107
self .current_test_name = None
101
108
self .start_time = None
102
109
self ._popen = None
110
+ self ._killed = False
111
+
112
+ def __repr__ (self ):
113
+ info = ['MultiprocessThread' ]
114
+ test = self .current_test_name
115
+ if self .is_alive ():
116
+ info .append ('alive' )
117
+ if test :
118
+ info .append (f'test={ test } ' )
119
+ popen = self ._popen
120
+ if popen :
121
+ info .append (f'pid={ popen .pid } ' )
122
+ return '<%s>' % ' ' .join (info )
103
123
104
124
def kill (self ):
125
+ self ._killed = True
126
+
105
127
popen = self ._popen
106
128
if popen is None :
107
129
return
108
- print ("Kill regrtest worker process %s" % popen .pid )
109
130
popen .kill ()
131
+ # stdout and stderr must be closed to ensure that communicate()
132
+ # does not hang
133
+ popen .stdout .close ()
134
+ popen .stderr .close ()
110
135
111
136
def _runtest (self , test_name ):
112
137
try :
@@ -117,7 +142,21 @@ def _runtest(self, test_name):
117
142
popen = self ._popen
118
143
with popen :
119
144
try :
120
- stdout , stderr = popen .communicate ()
145
+ if self ._killed :
146
+ # If kill() has been called before self._popen is set,
147
+ # self._popen is still running. Call again kill()
148
+ # to ensure that the process is killed.
149
+ self .kill ()
150
+ raise ExitThread
151
+
152
+ try :
153
+ stdout , stderr = popen .communicate ()
154
+ except OSError :
155
+ if self ._killed :
156
+ # kill() has been called: communicate() fails
157
+ # on reading closed stdout/stderr
158
+ raise ExitThread
159
+ raise
121
160
except :
122
161
self .kill ()
123
162
popen .wait ()
@@ -154,7 +193,7 @@ def _runtest(self, test_name):
154
193
return MultiprocessResult (result , stdout , stderr , err_msg )
155
194
156
195
def run (self ):
157
- while True :
196
+ while not self . _killed :
158
197
try :
159
198
try :
160
199
test_name = next (self .pending )
@@ -166,6 +205,8 @@ def run(self):
166
205
167
206
if must_stop (mp_result .result , self .ns ):
168
207
break
208
+ except ExitThread :
209
+ break
169
210
except BaseException :
170
211
self .output .put ((True , traceback .format_exc ()))
171
212
break
@@ -205,10 +246,20 @@ def start_workers(self):
205
246
worker .start ()
206
247
207
248
def wait_workers (self ):
249
+ start_time = time .monotonic ()
208
250
for worker in self .workers :
209
251
worker .kill ()
210
252
for worker in self .workers :
211
- worker .join ()
253
+ while True :
254
+ worker .join (1.0 )
255
+ if not worker .is_alive ():
256
+ break
257
+ dt = time .monotonic () - start_time
258
+ print ("Wait for regrtest worker %r for %.1f sec" % (worker , dt ))
259
+ if dt > JOIN_TIMEOUT :
260
+ print ("Warning -- failed to join a regrtest worker %s"
261
+ % worker )
262
+ break
212
263
213
264
def _get_result (self ):
214
265
if not any (worker .is_alive () for worker in self .workers ):
0 commit comments