1
1
"""This module contains code for capturing warnings."""
2
2
from __future__ import annotations
3
3
4
- import functools
5
- import re
6
- import textwrap
7
- import warnings
8
4
from collections import defaultdict
9
- from contextlib import contextmanager
10
5
from typing import Any
11
- from typing import cast
12
6
from typing import Dict
13
7
from typing import Generator
14
8
from typing import List
17
11
import click
18
12
from _pytask .config import hookimpl
19
13
from _pytask .console import console
20
- from _pytask .mark_utils import get_marks
21
14
from _pytask .nodes import Task
22
- from _pytask .outcomes import Exit
23
15
from _pytask .session import Session
24
- from _pytask .warnings_utils import WarningReport
16
+ from _pytask .warnings_utils import catch_warnings_for_item
17
+ from _pytask .warnings_utils import parse_filterwarnings
25
18
from rich .console import Console
26
19
from rich .console import ConsoleOptions
27
20
from rich .console import RenderResult
@@ -50,7 +43,7 @@ def pytask_parse_config(
50
43
) -> None :
51
44
"""Parse the configuration."""
52
45
config ["disable_warnings" ] = config_from_cli .get ("disable_warnings" , False )
53
- config ["filterwarnings" ] = _parse_filterwarnings (
46
+ config ["filterwarnings" ] = parse_filterwarnings (
54
47
config_from_file .get ("filterwarnings" )
55
48
)
56
49
config ["markers" ]["filterwarnings" ] = "Add a filter for a warning to a task."
@@ -63,157 +56,6 @@ def pytask_post_parse(config: dict[str, Any]) -> None:
63
56
config ["pm" ].register (WarningsNameSpace )
64
57
65
58
66
- def _parse_filterwarnings (x : str | list [str ] | None ) -> list [str ]:
67
- """Parse filterwarnings."""
68
- if x is None :
69
- return []
70
- elif isinstance (x , str ):
71
- return [i .strip () for i in x .split ("\n " )]
72
- elif isinstance (x , list ):
73
- return [i .strip () for i in x ]
74
- else :
75
- raise TypeError ("'filterwarnings' must be a str, list[str] or None." )
76
-
77
-
78
- @contextmanager
79
- def catch_warnings_for_item (
80
- session : Session ,
81
- task : Task | None = None ,
82
- when : str | None = None ,
83
- ) -> Generator [None , None , None ]:
84
- """Context manager that catches warnings generated in the contained execution block.
85
-
86
- ``item`` can be None if we are not in the context of an item execution. Each warning
87
- captured triggers the ``pytest_warning_recorded`` hook.
88
-
89
- """
90
- with warnings .catch_warnings (record = True ) as log :
91
- # mypy can't infer that record=True means log is not None; help it.
92
- assert log is not None
93
-
94
- for arg in session .config ["filterwarnings" ]:
95
- warnings .filterwarnings (* parse_warning_filter (arg , escape = False ))
96
-
97
- # apply filters from "filterwarnings" marks
98
- if task is not None :
99
- for mark in get_marks (task , "filterwarnings" ):
100
- for arg in mark .args :
101
- warnings .filterwarnings (* parse_warning_filter (arg , escape = False ))
102
-
103
- yield
104
-
105
- if task is not None :
106
- id_ = task .short_name
107
- else :
108
- id_ = when
109
-
110
- for warning_message in log :
111
- fs_location = warning_message .filename , warning_message .lineno
112
- session .warnings .append (
113
- WarningReport (
114
- message = warning_record_to_str (warning_message ),
115
- fs_location = fs_location ,
116
- id_ = id_ ,
117
- )
118
- )
119
-
120
-
121
- @functools .lru_cache (maxsize = 50 )
122
- def parse_warning_filter (
123
- arg : str , * , escape : bool
124
- ) -> tuple [warnings ._ActionKind , str , type [Warning ], str , int ]:
125
- """Parse a warnings filter string.
126
-
127
- This is copied from warnings._setoption with the following changes:
128
-
129
- - Does not apply the filter.
130
- - Escaping is optional.
131
- - Raises UsageError so we get nice error messages on failure.
132
-
133
- """
134
- __tracebackhide__ = True
135
- error_template = textwrap .dedent (
136
- f"""\
137
- while parsing the following warning configuration:
138
- { arg }
139
- This error occurred:
140
- {{error}}
141
- """
142
- )
143
-
144
- parts = arg .split (":" )
145
- if len (parts ) > 5 :
146
- doc_url = (
147
- "https://docs.python.org/3/library/warnings.html#describing-warning-filters"
148
- )
149
- error = textwrap .dedent (
150
- f"""\
151
- Too many fields ({ len (parts )} ), expected at most 5 separated by colons:
152
- action:message:category:module:line
153
- For more information please consult: { doc_url }
154
- """
155
- )
156
- raise Exit (error_template .format (error = error ))
157
-
158
- while len (parts ) < 5 :
159
- parts .append ("" )
160
- action_ , message , category_ , module , lineno_ = (s .strip () for s in parts )
161
- try :
162
- action : warnings ._ActionKind = warnings ._getaction (action_ ) # type: ignore
163
- except warnings ._OptionError as e :
164
- raise Exit (error_template .format (error = str (e )))
165
- try :
166
- category : type [Warning ] = _resolve_warning_category (category_ )
167
- except Exit as e :
168
- raise Exit (str (e ))
169
- if message and escape :
170
- message = re .escape (message )
171
- if module and escape :
172
- module = re .escape (module ) + r"\Z"
173
- if lineno_ :
174
- try :
175
- lineno = int (lineno_ )
176
- if lineno < 0 :
177
- raise ValueError ("number is negative" )
178
- except ValueError as e :
179
- raise Exit (error_template .format (error = f"invalid lineno { lineno_ !r} : { e } " ))
180
- else :
181
- lineno = 0
182
- return action , message , category , module , lineno
183
-
184
-
185
- def _resolve_warning_category (category : str ) -> type [Warning ]:
186
- """Copied from warnings._getcategory, but changed so it lets exceptions (specially
187
- ImportErrors) propagate so we can get access to their tracebacks (#9218)."""
188
- __tracebackhide__ = True
189
- if not category :
190
- return Warning
191
-
192
- if "." not in category :
193
- import builtins as m
194
-
195
- klass = category
196
- else :
197
- module , _ , klass = category .rpartition ("." )
198
- m = __import__ (module , None , None , [klass ])
199
- cat = getattr (m , klass )
200
- if not issubclass (cat , Warning ):
201
- raise Exception (f"{ cat } is not a Warning subclass" )
202
- return cast (type [Warning ], cat )
203
-
204
-
205
- def warning_record_to_str (warning_message : warnings .WarningMessage ) -> str :
206
- """Convert a warnings.WarningMessage to a string."""
207
- msg = warnings .formatwarning (
208
- message = warning_message .message ,
209
- category = warning_message .category ,
210
- filename = warning_message .filename ,
211
- lineno = warning_message .lineno ,
212
- line = warning_message .line ,
213
- )
214
- return msg
215
-
216
-
217
59
class WarningsNameSpace :
218
60
"""A namespace for the warnings plugin."""
219
61
0 commit comments