3
3
This is used for running mypy tests.
4
4
"""
5
5
6
- from typing import Dict , List , Optional , Set , Tuple , Any
6
+ from typing import Dict , List , Optional , Set , Tuple , Any , Iterable
7
7
8
8
import os
9
9
from multiprocessing import cpu_count
@@ -113,7 +113,8 @@ class Waiter:
113
113
LOGSIZE = 50
114
114
FULL_LOG_FILENAME = '.runtest_log.json'
115
115
116
- def __init__ (self , limit : int = 0 , * , verbosity : int = 0 , xfail : List [str ] = []) -> None :
116
+ def __init__ (self , limit : int = 0 , * , verbosity : int = 0 , xfail : List [str ] = [],
117
+ lf : bool = False , ff : bool = False ) -> None :
117
118
self .verbosity = verbosity
118
119
self .queue = [] # type: List[LazySubprocess]
119
120
# Index of next task to run in the queue.
@@ -130,12 +131,15 @@ def __init__(self, limit: int = 0, *, verbosity: int = 0, xfail: List[str] = [])
130
131
# major mistake to count *all* CPUs on the machine.
131
132
limit = len (sched_getaffinity (0 ))
132
133
self .limit = limit
134
+ self .lf = lf
135
+ self .ff = ff
133
136
assert limit > 0
134
137
self .xfail = set (xfail )
135
138
self ._note = None # type: Noter
136
139
self .times1 = {} # type: Dict[str, float]
137
140
self .times2 = {} # type: Dict[str, float]
138
- self .new_log = defaultdict (dict ) # type: Dict[str, Dict[str, Any]]
141
+ self .new_log = defaultdict (dict ) # type: Dict[str, Dict[str, float]]
142
+ self .sequential_tasks = set () # type: Set[str]
139
143
140
144
def load_log_file (self ) -> Optional [List [Dict [str , Dict [str , Any ]]]]:
141
145
try :
@@ -149,9 +153,13 @@ def load_log_file(self) -> Optional[List[Dict[str, Dict[str, Any]]]]:
149
153
test_log = []
150
154
return test_log
151
155
152
- def add (self , cmd : LazySubprocess ) -> int :
156
+ def add (self , cmd : LazySubprocess , sequential : bool = False ) -> int :
153
157
rv = len (self .queue )
158
+ if cmd .name in (task .name for task in self .queue ):
159
+ sys .exit ('Duplicate test name: {}' .format (cmd .name ))
154
160
self .queue .append (cmd )
161
+ if sequential :
162
+ self .sequential_tasks .add (cmd .name )
155
163
return rv
156
164
157
165
def _start_next (self ) -> None :
@@ -187,8 +195,8 @@ def _poll_current(self) -> Tuple[int, int]:
187
195
code = cmd .process .poll ()
188
196
if code is not None :
189
197
cmd .end_time = time .perf_counter ()
190
- self .new_log [cmd . name ][ 'exit_code' ] = code
191
- self .new_log [cmd . name ][ 'runtime' ] = cmd .end_time - cmd .start_time
198
+ self .new_log ['exit_code' ][ cmd . name ] = code
199
+ self .new_log ['runtime' ][ cmd . name ] = cmd .end_time - cmd .start_time
192
200
return pid , code
193
201
194
202
def _wait_next (self ) -> Tuple [List [str ], int , int ]:
@@ -262,16 +270,43 @@ def run(self) -> int:
262
270
self ._note = Noter (len (self .queue ))
263
271
print ('SUMMARY %d tasks selected' % len (self .queue ))
264
272
265
- if self . limit > 1 :
266
- logs = self . load_log_file ()
267
- if logs :
273
+ def avg ( lst : Iterable [ float ]) -> float :
274
+ valid_items = [ item for item in lst if item is not None ]
275
+ if not valid_items :
268
276
# we don't know how long a new task takes
269
277
# better err by putting it in front in case it is slow:
270
278
# a fast task in front hurts performance less than a slow task in the back
271
- default = float ('inf' )
272
- times = {cmd .name : sum (log [cmd .name ].get ('runtime' , default ) for log in logs )
273
- / len (logs ) for cmd in self .queue }
274
- self .queue = sorted (self .queue , key = lambda cmd : times [cmd .name ], reverse = True )
279
+ return float ('inf' )
280
+ else :
281
+ return sum (valid_items ) / len (valid_items )
282
+
283
+ logs = self .load_log_file ()
284
+ if logs :
285
+ times = {cmd .name : avg (log ['runtime' ].get (cmd .name , None ) for log in logs )
286
+ for cmd in self .queue }
287
+
288
+ def sort_function (cmd : LazySubprocess ) -> Tuple [Any , int , float ]:
289
+ # longest tasks first
290
+ runtime = - times [cmd .name ]
291
+ # sequential tasks go first by default
292
+ sequential = - (cmd .name in self .sequential_tasks )
293
+ if self .ff :
294
+ # failed tasks first with -ff
295
+ exit_code = - logs [- 1 ]['exit_code' ].get (cmd .name , 0 )
296
+ if not exit_code :
297
+ # avoid interrupting parallel tasks with sequential in between
298
+ # => order: seq failed, parallel failed, parallel passed, seq passed
299
+ # => among failed tasks, sequential should go before parallel
300
+ # => among successful tasks, sequential should go after parallel
301
+ sequential = - sequential
302
+ else :
303
+ # ignore exit code without -ff
304
+ exit_code = 0
305
+ return exit_code , sequential , runtime
306
+ self .queue = sorted (self .queue , key = sort_function )
307
+ if self .lf :
308
+ self .queue = [cmd for cmd in self .queue
309
+ if logs [- 1 ]['exit_code' ].get (cmd .name , 0 )]
275
310
276
311
sys .stdout .flush ()
277
312
# Failed tasks.
@@ -280,15 +315,35 @@ def run(self) -> int:
280
315
total_tests = 0
281
316
# Number of failed test cases.
282
317
total_failed_tests = 0
318
+ running_sequential_task = False
283
319
while self .current or self .next < len (self .queue ):
284
320
while len (self .current ) < self .limit and self .next < len (self .queue ):
321
+ # only start next task if idle, or current and next tasks are both parallel
322
+ if running_sequential_task :
323
+ break
324
+ if self .queue [self .next ].name in self .sequential_tasks :
325
+ if self .current :
326
+ break
327
+ else :
328
+ running_sequential_task = True
285
329
self ._start_next ()
286
330
fails , tests , test_fails = self ._wait_next ()
331
+ running_sequential_task = False
287
332
all_failures += fails
288
333
total_tests += tests
289
334
total_failed_tests += test_fails
290
335
if self .verbosity == 0 :
291
336
self ._note .clear ()
337
+
338
+ if self .new_log : # don't append empty log, it will corrupt the cache file
339
+ # log only LOGSIZE most recent tests
340
+ test_log = (self .load_log_file () + [self .new_log ])[:self .LOGSIZE ]
341
+ try :
342
+ with open (self .FULL_LOG_FILENAME , 'w' ) as fp :
343
+ json .dump (test_log , fp , sort_keys = True , indent = 4 )
344
+ except Exception as e :
345
+ print ('cannot save test log file:' , e )
346
+
292
347
if all_failures :
293
348
summary = 'SUMMARY %d/%d tasks and %d/%d tests failed' % (
294
349
len (all_failures ), len (self .queue ), total_failed_tests , total_tests )
@@ -305,16 +360,6 @@ def run(self) -> int:
305
360
len (self .queue ), total_tests ))
306
361
print ('*** OK ***' )
307
362
sys .stdout .flush ()
308
-
309
- if self .limit > 1 :
310
- # log only LOGSIZE most recent tests
311
- test_log = (self .load_log_file () + [self .new_log ])[:self .LOGSIZE ]
312
- try :
313
- with open (self .FULL_LOG_FILENAME , 'w' ) as fp :
314
- json .dump (test_log , fp , sort_keys = True , indent = 4 )
315
- except Exception as e :
316
- print ('cannot save test log file:' , e )
317
-
318
363
return 0
319
364
320
365
0 commit comments