16
16
import traceback
17
17
from contextlib import redirect_stderr , redirect_stdout
18
18
19
- from typing import AbstractSet , Any , Callable , Dict , List , Mapping , Optional , Sequence , Tuple
19
+ from typing import AbstractSet , Any , Callable , Dict , List , Optional , Sequence , Tuple
20
20
from typing_extensions import Final
21
21
22
22
import mypy .build
33
33
from mypy .suggestions import SuggestionFailure , SuggestionEngine
34
34
from mypy .typestate import reset_global_state
35
35
from mypy .version import __version__
36
+ from mypy .util import FancyFormatter , count_stats
36
37
37
38
MEM_PROFILE = False # type: Final # If True, dump memory profile after initialization
38
39
@@ -188,6 +189,10 @@ def __init__(self, options: Options,
188
189
options .local_partial_types = True
189
190
self .status_file = status_file
190
191
192
+ # Since the object is created in the parent process we can check
193
+ # the output terminal options here.
194
+ self .formatter = FancyFormatter (sys .stdout , sys .stderr , options .show_error_codes )
195
+
191
196
def _response_metadata (self ) -> Dict [str , str ]:
192
197
py_version = '{}_{}' .format (self .options .python_version [0 ], self .options .python_version [1 ])
193
198
return {
@@ -248,13 +253,18 @@ def serve(self) -> None:
248
253
if exc_info [0 ] and exc_info [0 ] is not SystemExit :
249
254
traceback .print_exception (* exc_info )
250
255
251
- def run_command (self , command : str , data : Mapping [str , object ]) -> Dict [str , object ]:
256
+ def run_command (self , command : str , data : Dict [str , object ]) -> Dict [str , object ]:
252
257
"""Run a specific command from the registry."""
253
258
key = 'cmd_' + command
254
259
method = getattr (self .__class__ , key , None )
255
260
if method is None :
256
261
return {'error' : "Unrecognized command '%s'" % command }
257
262
else :
263
+ if command not in {'check' , 'recheck' , 'run' }:
264
+ # Only the above commands use some error formatting.
265
+ del data ['is_tty' ]
266
+ elif int (os .getenv ('MYPY_FORCE_COLOR' , '0' )):
267
+ data ['is_tty' ] = True
258
268
return method (self , ** data )
259
269
260
270
# Command functions (run in the server via RPC).
@@ -280,7 +290,7 @@ def cmd_stop(self) -> Dict[str, object]:
280
290
os .unlink (self .status_file )
281
291
return {}
282
292
283
- def cmd_run (self , version : str , args : Sequence [str ]) -> Dict [str , object ]:
293
+ def cmd_run (self , version : str , args : Sequence [str ], is_tty : bool ) -> Dict [str , object ]:
284
294
"""Check a list of files, triggering a restart if needed."""
285
295
try :
286
296
# Process options can exit on improper arguments, so we need to catch that and
@@ -313,17 +323,18 @@ def cmd_run(self, version: str, args: Sequence[str]) -> Dict[str, object]:
313
323
return {'out' : '' , 'err' : str (err ), 'status' : 2 }
314
324
except SystemExit as e :
315
325
return {'out' : stdout .getvalue (), 'err' : stderr .getvalue (), 'status' : e .code }
316
- return self .check (sources )
326
+ return self .check (sources , is_tty )
317
327
318
- def cmd_check (self , files : Sequence [str ]) -> Dict [str , object ]:
328
+ def cmd_check (self , files : Sequence [str ], is_tty : bool ) -> Dict [str , object ]:
319
329
"""Check a list of files."""
320
330
try :
321
331
sources = create_source_list (files , self .options , self .fscache )
322
332
except InvalidSourceList as err :
323
333
return {'out' : '' , 'err' : str (err ), 'status' : 2 }
324
- return self .check (sources )
334
+ return self .check (sources , is_tty )
325
335
326
336
def cmd_recheck (self ,
337
+ is_tty : bool ,
327
338
remove : Optional [List [str ]] = None ,
328
339
update : Optional [List [str ]] = None ) -> Dict [str , object ]:
329
340
"""Check the same list of files we checked most recently.
@@ -349,17 +360,21 @@ def cmd_recheck(self,
349
360
t1 = time .time ()
350
361
manager = self .fine_grained_manager .manager
351
362
manager .log ("fine-grained increment: cmd_recheck: {:.3f}s" .format (t1 - t0 ))
352
- res = self .fine_grained_increment (sources , remove , update )
363
+ res = self .fine_grained_increment (sources , is_tty , remove , update )
353
364
self .fscache .flush ()
354
365
self .update_stats (res )
355
366
return res
356
367
357
- def check (self , sources : List [BuildSource ]) -> Dict [str , Any ]:
358
- """Check using fine-grained incremental mode."""
368
+ def check (self , sources : List [BuildSource ], is_tty : bool ) -> Dict [str , Any ]:
369
+ """Check using fine-grained incremental mode.
370
+
371
+ If is_tty is True format the output nicely with colors and summary line
372
+ (unless disabled in self.options).
373
+ """
359
374
if not self .fine_grained_manager :
360
- res = self .initialize_fine_grained (sources )
375
+ res = self .initialize_fine_grained (sources , is_tty )
361
376
else :
362
- res = self .fine_grained_increment (sources )
377
+ res = self .fine_grained_increment (sources , is_tty )
363
378
self .fscache .flush ()
364
379
self .update_stats (res )
365
380
return res
@@ -371,7 +386,8 @@ def update_stats(self, res: Dict[str, Any]) -> None:
371
386
res ['stats' ] = manager .stats
372
387
manager .stats = {}
373
388
374
- def initialize_fine_grained (self , sources : List [BuildSource ]) -> Dict [str , Any ]:
389
+ def initialize_fine_grained (self , sources : List [BuildSource ],
390
+ is_tty : bool ) -> Dict [str , Any ]:
375
391
self .fswatcher = FileSystemWatcher (self .fscache )
376
392
t0 = time .time ()
377
393
self .update_sources (sources )
@@ -433,10 +449,12 @@ def initialize_fine_grained(self, sources: List[BuildSource]) -> Dict[str, Any]:
433
449
print_memory_profile (run_gc = False )
434
450
435
451
status = 1 if messages else 0
452
+ messages = self .pretty_messages (messages , len (sources ), is_tty )
436
453
return {'out' : '' .join (s + '\n ' for s in messages ), 'err' : '' , 'status' : status }
437
454
438
455
def fine_grained_increment (self ,
439
456
sources : List [BuildSource ],
457
+ is_tty : bool ,
440
458
remove : Optional [List [str ]] = None ,
441
459
update : Optional [List [str ]] = None ,
442
460
) -> Dict [str , Any ]:
@@ -466,8 +484,28 @@ def fine_grained_increment(self,
466
484
467
485
status = 1 if messages else 0
468
486
self .previous_sources = sources
487
+ messages = self .pretty_messages (messages , len (sources ), is_tty )
469
488
return {'out' : '' .join (s + '\n ' for s in messages ), 'err' : '' , 'status' : status }
470
489
490
+ def pretty_messages (self , messages : List [str ], n_sources : int ,
491
+ is_tty : bool = False ) -> List [str ]:
492
+ use_color = self .options .color_output and is_tty
493
+ if self .options .error_summary :
494
+ summary = None # type: Optional[str]
495
+ if messages :
496
+ n_errors , n_files = count_stats (messages )
497
+ if n_errors :
498
+ summary = self .formatter .format_error (n_errors , n_files , n_sources ,
499
+ use_color )
500
+ else :
501
+ summary = self .formatter .format_success (n_sources , use_color )
502
+ if summary :
503
+ # Create new list to avoid appending multiple summaries on successive runs.
504
+ messages = messages + [summary ]
505
+ if use_color :
506
+ messages = [self .formatter .colorize (m ) for m in messages ]
507
+ return messages
508
+
471
509
def update_sources (self , sources : List [BuildSource ]) -> None :
472
510
paths = [source .path for source in sources if source .path is not None ]
473
511
self .fswatcher .add_watched_paths (paths )
0 commit comments