27
27
class ImprovedCompositeRawDecoder (CompositeRawDecoder ):
28
28
"""
29
29
Enhanced CompositeRawDecoder with better GZIP detection and error handling.
30
-
30
+
31
31
This addresses the StreamThreadException issue by:
32
32
1. Auto-detecting GZIP content based on magic bytes
33
33
2. Providing better error handling for decompression failures
34
34
3. Falling back gracefully when parser selection fails
35
35
"""
36
-
36
+
37
37
def __init__ (
38
38
self ,
39
39
parser : Parser ,
@@ -43,62 +43,62 @@ def __init__(
43
43
) -> None :
44
44
super ().__init__ (parser , stream_response , parsers_by_header )
45
45
self ._auto_detect_gzip = auto_detect_gzip
46
-
46
+
47
47
def _detect_gzip_content (self , response : requests .Response ) -> bool :
48
48
"""
49
49
Detect if response content is GZIP-compressed by checking magic bytes.
50
-
50
+
51
51
Returns True if the response starts with GZIP magic number (0x1f, 0x8b).
52
52
This helps identify GZIP content even when Content-Encoding header is missing.
53
53
"""
54
54
if not self ._auto_detect_gzip :
55
55
return False
56
-
56
+
57
57
try :
58
- if hasattr (response , ' raw' ) and response .raw :
59
- current_pos = response .raw .tell () if hasattr (response .raw , ' tell' ) else None
60
-
58
+ if hasattr (response , " raw" ) and response .raw :
59
+ current_pos = response .raw .tell () if hasattr (response .raw , " tell" ) else None
60
+
61
61
magic_bytes = response .raw .read (2 )
62
-
63
- if current_pos is not None and hasattr (response .raw , ' seek' ):
62
+
63
+ if current_pos is not None and hasattr (response .raw , " seek" ):
64
64
response .raw .seek (current_pos )
65
- elif hasattr (response .raw , ' seek' ):
65
+ elif hasattr (response .raw , " seek" ):
66
66
response .raw .seek (0 )
67
-
68
- return len (magic_bytes ) >= 2 and magic_bytes [0 ] == 0x1f and magic_bytes [1 ] == 0x8b
69
-
70
- elif hasattr (response , ' content' ) and len (response .content ) >= 2 :
71
- return response .content [0 ] == 0x1f and response .content [1 ] == 0x8b
72
-
67
+
68
+ return len (magic_bytes ) >= 2 and magic_bytes [0 ] == 0x1F and magic_bytes [1 ] == 0x8B
69
+
70
+ elif hasattr (response , " content" ) and len (response .content ) >= 2 :
71
+ return response .content [0 ] == 0x1F and response .content [1 ] == 0x8B
72
+
73
73
except Exception as e :
74
74
logger .debug (f"Failed to detect GZIP content: { e } " )
75
-
75
+
76
76
return False
77
-
77
+
78
78
def _select_parser (self , response : requests .Response ) -> Parser :
79
79
"""
80
80
Enhanced parser selection with GZIP auto-detection.
81
-
81
+
82
82
This method extends the base implementation to:
83
83
1. Check Content-Encoding header (existing behavior)
84
84
2. Auto-detect GZIP content by magic bytes
85
85
3. Wrap parser with GzipParser if GZIP is detected
86
86
"""
87
87
selected_parser = super ()._select_parser (response )
88
-
89
- if ( not isinstance (selected_parser , GzipParser ) and
90
- self . _detect_gzip_content ( response )):
91
-
92
- logger . info ( "Auto-detected GZIP content without Content-Encoding header, wrapping parser" )
93
-
88
+
89
+ if not isinstance (selected_parser , GzipParser ) and self . _detect_gzip_content ( response ):
90
+ logger . info (
91
+ "Auto-detected GZIP content without Content-Encoding header, wrapping parser"
92
+ )
93
+
94
94
return GzipParser (inner_parser = selected_parser )
95
-
95
+
96
96
return selected_parser
97
-
97
+
98
98
def decode (self , response : requests .Response ):
99
99
"""
100
100
Enhanced decode method with better error handling.
101
-
101
+
102
102
Provides more informative error messages and graceful fallback
103
103
when decompression or parsing fails.
104
104
"""
@@ -113,14 +113,14 @@ def decode(self, response: requests.Response):
113
113
f"Original error: { e } "
114
114
)
115
115
logger .error (error_msg )
116
-
116
+
117
117
if self ._auto_detect_gzip and self ._detect_gzip_content (response ):
118
118
logger .info ("Attempting recovery with GZIP decompression" )
119
119
gzip_parser = GzipParser (inner_parser = self .parser )
120
-
121
- if hasattr (response , ' raw' ) and hasattr (response .raw , ' seek' ):
120
+
121
+ if hasattr (response , " raw" ) and hasattr (response .raw , " seek" ):
122
122
response .raw .seek (0 )
123
-
123
+
124
124
try :
125
125
if self .is_stream_response ():
126
126
response .raw .auto_close = False
@@ -131,7 +131,7 @@ def decode(self, response: requests.Response):
131
131
return
132
132
except Exception as recovery_error :
133
133
logger .error (f"GZIP recovery failed: { recovery_error } " )
134
-
134
+
135
135
raise RuntimeError (error_msg ) from e
136
136
else :
137
137
raise
@@ -143,22 +143,22 @@ def decode(self, response: requests.Response):
143
143
def create_bing_ads_compatible_decoder () -> ImprovedCompositeRawDecoder :
144
144
"""
145
145
Create a CompositeRawDecoder configured for Bing Ads bulk streams.
146
-
146
+
147
147
This decoder handles the campaign_labels stream and other bulk streams
148
148
that use GZIP compression with CSV data.
149
149
"""
150
150
csv_parser = CsvParser (encoding = "utf-8-sig" , set_values_to_none = ["" ])
151
-
151
+
152
152
gzip_parser = GzipParser (inner_parser = csv_parser )
153
-
153
+
154
154
decoder = ImprovedCompositeRawDecoder .by_headers (
155
155
parsers = [({"Content-Encoding" }, {"gzip" }, gzip_parser )],
156
156
stream_response = True ,
157
157
fallback_parser = csv_parser ,
158
158
)
159
-
159
+
160
160
decoder ._auto_detect_gzip = True
161
-
161
+
162
162
return decoder
163
163
164
164
0 commit comments