3
3
This is used for running mypy tests.
4
4
"""
5
5
6
- from typing import Dict , List , Optional , Set , Tuple
6
+ from typing import Dict , List , Optional , Set , Tuple , Any , Iterable
7
7
8
8
import os
9
+ from multiprocessing import cpu_count
9
10
import pipes
10
11
import re
11
12
from subprocess import Popen , STDOUT
12
13
import sys
13
14
import tempfile
14
15
import time
16
+ import json
17
+ from collections import defaultdict
15
18
16
19
17
20
class WaiterError (Exception ):
@@ -32,7 +35,7 @@ def __init__(self, name: str, args: List[str], *, cwd: str = None,
32
35
33
36
def start (self ) -> None :
34
37
self .outfile = tempfile .TemporaryFile ()
35
- self .start_time = time .time ()
38
+ self .start_time = time .perf_counter ()
36
39
self .process = Popen (self .args , cwd = self .cwd , env = self .env ,
37
40
stdout = self .outfile , stderr = STDOUT )
38
41
self .pid = self .process .pid
@@ -107,7 +110,11 @@ class Waiter:
107
110
if not waiter.run():
108
111
print('error')
109
112
"""
110
- def __init__ (self , limit : int = 0 , * , verbosity : int = 0 , xfail : List [str ] = []) -> None :
113
+ LOGSIZE = 50
114
+ FULL_LOG_FILENAME = '.runtest_log.json'
115
+
116
+ def __init__ (self , limit : int = 0 , * , verbosity : int = 0 , xfail : List [str ] = [],
117
+ lf : bool = False , ff : bool = False ) -> None :
111
118
self .verbosity = verbosity
112
119
self .queue = [] # type: List[LazySubprocess]
113
120
# Index of next task to run in the queue.
@@ -117,21 +124,42 @@ def __init__(self, limit: int = 0, *, verbosity: int = 0, xfail: List[str] = [])
117
124
try :
118
125
sched_getaffinity = os .sched_getaffinity
119
126
except AttributeError :
120
- limit = 2
127
+ # no support for affinity on OSX/Windows
128
+ limit = cpu_count ()
121
129
else :
122
130
# Note: only count CPUs we are allowed to use. It is a
123
131
# major mistake to count *all* CPUs on the machine.
124
132
limit = len (sched_getaffinity (0 ))
125
133
self .limit = limit
134
+ self .lf = lf
135
+ self .ff = ff
126
136
assert limit > 0
127
137
self .xfail = set (xfail )
128
138
self ._note = None # type: Noter
129
139
self .times1 = {} # type: Dict[str, float]
130
140
self .times2 = {} # type: Dict[str, float]
131
-
132
- def add (self , cmd : LazySubprocess ) -> int :
141
+ self .new_log = defaultdict (dict ) # type: Dict[str, Dict[str, float]]
142
+ self .sequential_tasks = set () # type: Set[str]
143
+
144
+ def load_log_file (self ) -> Optional [List [Dict [str , Dict [str , Any ]]]]:
145
+ try :
146
+ # get the last log
147
+ with open (self .FULL_LOG_FILENAME ) as fp :
148
+ test_log = json .load (fp )
149
+ except FileNotFoundError :
150
+ test_log = []
151
+ except json .JSONDecodeError :
152
+ print ('corrupt test log file {}' .format (self .FULL_LOG_FILENAME ), file = sys .stderr )
153
+ test_log = []
154
+ return test_log
155
+
156
+ def add (self , cmd : LazySubprocess , sequential : bool = False ) -> int :
133
157
rv = len (self .queue )
158
+ if cmd .name in (task .name for task in self .queue ):
159
+ sys .exit ('Duplicate test name: {}' .format (cmd .name ))
134
160
self .queue .append (cmd )
161
+ if sequential :
162
+ self .sequential_tasks .add (cmd .name )
135
163
return rv
136
164
137
165
def _start_next (self ) -> None :
@@ -161,12 +189,14 @@ def _record_time(self, name: str, elapsed_time: float) -> None:
161
189
162
190
def _poll_current (self ) -> Tuple [int , int ]:
163
191
while True :
164
- time .sleep (.05 )
192
+ time .sleep (.01 )
165
193
for pid in self .current :
166
194
cmd = self .current [pid ][1 ]
167
195
code = cmd .process .poll ()
168
196
if code is not None :
169
- cmd .end_time = time .time ()
197
+ cmd .end_time = time .perf_counter ()
198
+ self .new_log ['exit_code' ][cmd .name ] = code
199
+ self .new_log ['runtime' ][cmd .name ] = cmd .end_time - cmd .start_time
170
200
return pid , code
171
201
172
202
def _wait_next (self ) -> Tuple [List [str ], int , int ]:
@@ -239,22 +269,83 @@ def run(self) -> int:
239
269
if self .verbosity == 0 :
240
270
self ._note = Noter (len (self .queue ))
241
271
print ('SUMMARY %d tasks selected' % len (self .queue ))
272
+
273
+ def avg (lst : Iterable [float ]) -> float :
274
+ valid_items = [item for item in lst if item is not None ]
275
+ if not valid_items :
276
+ # we don't know how long a new task takes
277
+ # better err by putting it in front in case it is slow:
278
+ # a fast task in front hurts performance less than a slow task in the back
279
+ return float ('inf' )
280
+ else :
281
+ return sum (valid_items ) / len (valid_items )
282
+
283
+ logs = self .load_log_file ()
284
+ if logs :
285
+ times = {cmd .name : avg (log ['runtime' ].get (cmd .name , None ) for log in logs )
286
+ for cmd in self .queue }
287
+
288
+ def sort_function (cmd : LazySubprocess ) -> Tuple [Any , int , float ]:
289
+ # longest tasks first
290
+ runtime = - times [cmd .name ]
291
+ # sequential tasks go first by default
292
+ sequential = - (cmd .name in self .sequential_tasks )
293
+ if self .ff :
294
+ # failed tasks first with -ff
295
+ exit_code = - logs [- 1 ]['exit_code' ].get (cmd .name , 0 )
296
+ if not exit_code :
297
+ # avoid interrupting parallel tasks with sequential in between
298
+ # so either: seq failed, parallel failed, parallel passed, seq passed
299
+ # or: parallel failed, seq failed, seq passed, parallel passed
300
+ # I picked the first one arbitrarily, since no obvious pros/cons
301
+ # in other words, among failed tasks, sequential should go before parallel,
302
+ # and among successful tasks, sequential should go after parallel
303
+ sequential = - sequential
304
+ else :
305
+ # ignore exit code without -ff
306
+ exit_code = 0
307
+ return exit_code , sequential , runtime
308
+ self .queue = sorted (self .queue , key = sort_function )
309
+ if self .lf :
310
+ self .queue = [cmd for cmd in self .queue
311
+ if logs [- 1 ]['exit_code' ].get (cmd .name , 0 )]
312
+
242
313
sys .stdout .flush ()
243
314
# Failed tasks.
244
315
all_failures = [] # type: List[str]
245
316
# Number of test cases. Some tasks can involve multiple test cases.
246
317
total_tests = 0
247
318
# Number of failed test cases.
248
319
total_failed_tests = 0
320
+ running_sequential_task = False
249
321
while self .current or self .next < len (self .queue ):
250
322
while len (self .current ) < self .limit and self .next < len (self .queue ):
323
+ # only start next task if idle, or current and next tasks are both parallel
324
+ if running_sequential_task :
325
+ break
326
+ if self .queue [self .next ].name in self .sequential_tasks :
327
+ if self .current :
328
+ break
329
+ else :
330
+ running_sequential_task = True
251
331
self ._start_next ()
252
332
fails , tests , test_fails = self ._wait_next ()
333
+ running_sequential_task = False
253
334
all_failures += fails
254
335
total_tests += tests
255
336
total_failed_tests += test_fails
256
337
if self .verbosity == 0 :
257
338
self ._note .clear ()
339
+
340
+ if self .new_log : # don't append empty log, it will corrupt the cache file
341
+ # log only LOGSIZE most recent tests
342
+ test_log = (self .load_log_file () + [self .new_log ])[:self .LOGSIZE ]
343
+ try :
344
+ with open (self .FULL_LOG_FILENAME , 'w' ) as fp :
345
+ json .dump (test_log , fp , sort_keys = True , indent = 4 )
346
+ except Exception as e :
347
+ print ('cannot save test log file:' , e )
348
+
258
349
if all_failures :
259
350
summary = 'SUMMARY %d/%d tasks and %d/%d tests failed' % (
260
351
len (all_failures ), len (self .queue ), total_failed_tests , total_tests )
@@ -271,7 +362,6 @@ def run(self) -> int:
271
362
len (self .queue ), total_tests ))
272
363
print ('*** OK ***' )
273
364
sys .stdout .flush ()
274
-
275
365
return 0
276
366
277
367
0 commit comments