|
5 | 5 | import os
|
6 | 6 | import re
|
7 | 7 | import sys
|
| 8 | +from collections import defaultdict |
| 9 | +from typing import Iterator |
8 | 10 |
|
9 | 11 | from mypy import build
|
10 | 12 | from mypy.build import Graph
|
11 | 13 | from mypy.errors import CompileError
|
12 | 14 | from mypy.modulefinder import BuildSource, FindModuleCache, SearchPaths
|
13 | 15 | from mypy.options import TYPE_VAR_TUPLE, UNPACK
|
14 | 16 | from mypy.test.config import test_data_prefix, test_temp_dir
|
15 |
| -from mypy.test.data import DataDrivenTestCase, DataSuite, FileOperation, module_from_path |
| 17 | +from mypy.test.data import ( |
| 18 | + DataDrivenTestCase, |
| 19 | + DataFileCollector, |
| 20 | + DataFileFix, |
| 21 | + DataSuite, |
| 22 | + FileOperation, |
| 23 | + module_from_path, |
| 24 | + parse_test_data, |
| 25 | +) |
16 | 26 | from mypy.test.helpers import (
|
17 | 27 | assert_module_equivalence,
|
18 | 28 | assert_string_arrays_equal,
|
|
22 | 32 | normalize_error_messages,
|
23 | 33 | parse_options,
|
24 | 34 | perform_file_operations,
|
25 |
| - update_testcase_output, |
26 | 35 | )
|
27 | 36 |
|
28 | 37 | try:
|
@@ -192,7 +201,13 @@ def run_case_once(
|
192 | 201 | output = testcase.output2.get(incremental_step, [])
|
193 | 202 |
|
194 | 203 | if output != a and testcase.config.getoption("--update-data", False):
|
195 |
| - update_testcase_output(testcase, a) |
| 204 | + collector = testcase.parent |
| 205 | + assert isinstance(collector, DataFileCollector) |
| 206 | + for fix in self.iter_data_file_fixes( |
| 207 | + testcase, actual=a, incremental_step=incremental_step |
| 208 | + ): |
| 209 | + collector.enqueue_fix(fix) |
| 210 | + |
196 | 211 | assert_string_arrays_equal(output, a, msg.format(testcase.file, testcase.line))
|
197 | 212 |
|
198 | 213 | if res:
|
@@ -226,6 +241,74 @@ def run_case_once(
|
226 | 241 | if testcase.output_files:
|
227 | 242 | check_test_output_files(testcase, incremental_step, strip_prefix="tmp/")
|
228 | 243 |
|
| 244 | + def iter_data_file_fixes( |
| 245 | + self, testcase: DataDrivenTestCase, *, actual: list[str], incremental_step: int |
| 246 | + ) -> Iterator[DataFileFix]: |
| 247 | + reports_by_line: dict[tuple[str, int], list[tuple[str, str]]] = defaultdict(list) |
| 248 | + for error_line in actual: |
| 249 | + comment_match = re.match( |
| 250 | + r"^(?P<filename>[^:]+):(?P<lineno>\d+): (?P<severity>error|note|warning): (?P<msg>.+)$", |
| 251 | + error_line, |
| 252 | + ) |
| 253 | + if comment_match: |
| 254 | + filename = comment_match.group("filename") |
| 255 | + lineno = int(comment_match.group("lineno")) |
| 256 | + severity = comment_match.group("severity") |
| 257 | + msg = comment_match.group("msg") |
| 258 | + reports_by_line[filename, lineno].append((severity, msg)) |
| 259 | + |
| 260 | + test_items = parse_test_data(testcase.data, testcase.name) |
| 261 | + |
| 262 | + # If we have [out] and/or [outN], we update just those sections. |
| 263 | + if any(re.match(r"^out\d*$", test_item.id) for test_item in test_items): |
| 264 | + for test_item in test_items: |
| 265 | + if (incremental_step < 2 and test_item.id == "out") or ( |
| 266 | + incremental_step >= 2 and test_item.id == f"out{incremental_step}" |
| 267 | + ): |
| 268 | + yield DataFileFix( |
| 269 | + lineno=testcase.line + test_item.line - 1, |
| 270 | + end_lineno=testcase.line + test_item.end_line - 1, |
| 271 | + lines=actual + [""] * test_item.trimmed_newlines, |
| 272 | + ) |
| 273 | + |
| 274 | + return |
| 275 | + |
| 276 | + # Update assertion comments within the sections |
| 277 | + for test_item in test_items: |
| 278 | + if test_item.id == "case": |
| 279 | + source_lines = test_item.data |
| 280 | + file_path = "main" |
| 281 | + elif test_item.id == "file": |
| 282 | + source_lines = test_item.data |
| 283 | + file_path = f"tmp/{test_item.arg}" |
| 284 | + else: |
| 285 | + continue # other sections we don't touch |
| 286 | + |
| 287 | + fix_lines = [] |
| 288 | + for lineno, source_line in enumerate(source_lines, start=1): |
| 289 | + reports = reports_by_line.get((file_path, lineno)) |
| 290 | + comment_match = re.search(r"(?P<indent>\s+)(?P<comment># [EWN]: .+)$", source_line) |
| 291 | + if comment_match: |
| 292 | + source_line = source_line[: comment_match.start("indent")] # strip old comment |
| 293 | + if reports: |
| 294 | + indent = comment_match.group("indent") if comment_match else " " |
| 295 | + # multiline comments are on the first line and then on subsequent lines emtpy lines |
| 296 | + # with a continuation backslash |
| 297 | + for j, (severity, msg) in enumerate(reports): |
| 298 | + out_l = source_line if j == 0 else " " * len(source_line) |
| 299 | + is_last = j == len(reports) - 1 |
| 300 | + severity_char = severity[0].upper() |
| 301 | + continuation = "" if is_last else " \\" |
| 302 | + fix_lines.append(f"{out_l}{indent}# {severity_char}: {msg}{continuation}") |
| 303 | + else: |
| 304 | + fix_lines.append(source_line) |
| 305 | + |
| 306 | + yield DataFileFix( |
| 307 | + lineno=testcase.line + test_item.line - 1, |
| 308 | + end_lineno=testcase.line + test_item.end_line - 1, |
| 309 | + lines=fix_lines + [""] * test_item.trimmed_newlines, |
| 310 | + ) |
| 311 | + |
229 | 312 | def verify_cache(
|
230 | 313 | self,
|
231 | 314 | module_data: list[tuple[str, str, str]],
|
|
0 commit comments