Skip to content

Commit 4518a4d

Browse files
feat: implement proposed fix for GZIP parser selection issue
- Add ImprovedCompositeRawDecoder with auto-detection of GZIP content - Detect GZIP magic bytes (0x1f 0x8b) when Content-Encoding header missing - Provide better error handling for UTF-8 decoding of GZIP data - Add recovery mechanism for StreamThreadException in campaign_labels stream - Create Bing Ads compatible decoder configuration Co-Authored-By: unknown <>
1 parent db8c089 commit 4518a4d

File tree

1 file changed

+171
-0
lines changed

1 file changed

+171
-0
lines changed

fix_gzip_parser_selection.py

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
"""
2+
Proposed fix for StreamThreadException in Bing Ads source connector.
3+
4+
This fix addresses the root cause where GZIP-compressed data is incorrectly
5+
treated as UTF-8 text due to missing Content-Encoding header detection
6+
in the concurrent source framework.
7+
8+
Issue: #8301 - 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte
9+
"""
10+
11+
import io
12+
import logging
13+
from typing import Any, Dict, Optional
14+
15+
import requests
16+
17+
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
18+
CompositeRawDecoder,
19+
CsvParser,
20+
GzipParser,
21+
Parser,
22+
)
23+
24+
logger = logging.getLogger("airbyte")
25+
26+
27+
class ImprovedCompositeRawDecoder(CompositeRawDecoder):
28+
"""
29+
Enhanced CompositeRawDecoder with better GZIP detection and error handling.
30+
31+
This addresses the StreamThreadException issue by:
32+
1. Auto-detecting GZIP content based on magic bytes
33+
2. Providing better error handling for decompression failures
34+
3. Falling back gracefully when parser selection fails
35+
"""
36+
37+
def __init__(
38+
self,
39+
parser: Parser,
40+
stream_response: bool = True,
41+
parsers_by_header: Optional[Dict[str, Any]] = None,
42+
auto_detect_gzip: bool = True,
43+
) -> None:
44+
super().__init__(parser, stream_response, parsers_by_header)
45+
self._auto_detect_gzip = auto_detect_gzip
46+
47+
def _detect_gzip_content(self, response: requests.Response) -> bool:
48+
"""
49+
Detect if response content is GZIP-compressed by checking magic bytes.
50+
51+
Returns True if the response starts with GZIP magic number (0x1f, 0x8b).
52+
This helps identify GZIP content even when Content-Encoding header is missing.
53+
"""
54+
if not self._auto_detect_gzip:
55+
return False
56+
57+
try:
58+
if hasattr(response, 'raw') and response.raw:
59+
current_pos = response.raw.tell() if hasattr(response.raw, 'tell') else None
60+
61+
magic_bytes = response.raw.read(2)
62+
63+
if current_pos is not None and hasattr(response.raw, 'seek'):
64+
response.raw.seek(current_pos)
65+
elif hasattr(response.raw, 'seek'):
66+
response.raw.seek(0)
67+
68+
return len(magic_bytes) >= 2 and magic_bytes[0] == 0x1f and magic_bytes[1] == 0x8b
69+
70+
elif hasattr(response, 'content') and len(response.content) >= 2:
71+
return response.content[0] == 0x1f and response.content[1] == 0x8b
72+
73+
except Exception as e:
74+
logger.debug(f"Failed to detect GZIP content: {e}")
75+
76+
return False
77+
78+
def _select_parser(self, response: requests.Response) -> Parser:
79+
"""
80+
Enhanced parser selection with GZIP auto-detection.
81+
82+
This method extends the base implementation to:
83+
1. Check Content-Encoding header (existing behavior)
84+
2. Auto-detect GZIP content by magic bytes
85+
3. Wrap parser with GzipParser if GZIP is detected
86+
"""
87+
selected_parser = super()._select_parser(response)
88+
89+
if (not isinstance(selected_parser, GzipParser) and
90+
self._detect_gzip_content(response)):
91+
92+
logger.info("Auto-detected GZIP content without Content-Encoding header, wrapping parser")
93+
94+
return GzipParser(inner_parser=selected_parser)
95+
96+
return selected_parser
97+
98+
def decode(self, response: requests.Response):
99+
"""
100+
Enhanced decode method with better error handling.
101+
102+
Provides more informative error messages and graceful fallback
103+
when decompression or parsing fails.
104+
"""
105+
try:
106+
yield from super().decode(response)
107+
except UnicodeDecodeError as e:
108+
if "can't decode byte 0x8b" in str(e):
109+
error_msg = (
110+
f"UTF-8 decoding failed with GZIP magic byte 0x8b. "
111+
f"This suggests GZIP-compressed data is being treated as UTF-8 text. "
112+
f"Check Content-Encoding headers or enable auto_detect_gzip. "
113+
f"Original error: {e}"
114+
)
115+
logger.error(error_msg)
116+
117+
if self._auto_detect_gzip and self._detect_gzip_content(response):
118+
logger.info("Attempting recovery with GZIP decompression")
119+
gzip_parser = GzipParser(inner_parser=self.parser)
120+
121+
if hasattr(response, 'raw') and hasattr(response.raw, 'seek'):
122+
response.raw.seek(0)
123+
124+
try:
125+
if self.is_stream_response():
126+
response.raw.auto_close = False
127+
yield from gzip_parser.parse(data=response.raw)
128+
response.raw.close()
129+
else:
130+
yield from gzip_parser.parse(data=io.BytesIO(response.content))
131+
return
132+
except Exception as recovery_error:
133+
logger.error(f"GZIP recovery failed: {recovery_error}")
134+
135+
raise RuntimeError(error_msg) from e
136+
else:
137+
raise
138+
except Exception as e:
139+
logger.error(f"Decoder error: {e}")
140+
raise
141+
142+
143+
def create_bing_ads_compatible_decoder() -> ImprovedCompositeRawDecoder:
144+
"""
145+
Create a CompositeRawDecoder configured for Bing Ads bulk streams.
146+
147+
This decoder handles the campaign_labels stream and other bulk streams
148+
that use GZIP compression with CSV data.
149+
"""
150+
csv_parser = CsvParser(encoding="utf-8-sig", set_values_to_none=[""])
151+
152+
gzip_parser = GzipParser(inner_parser=csv_parser)
153+
154+
decoder = ImprovedCompositeRawDecoder.by_headers(
155+
parsers=[({"Content-Encoding"}, {"gzip"}, gzip_parser)],
156+
stream_response=True,
157+
fallback_parser=csv_parser,
158+
)
159+
160+
decoder._auto_detect_gzip = True
161+
162+
return decoder
163+
164+
165+
if __name__ == "__main__":
166+
print("Proposed fix for StreamThreadException in Bing Ads source")
167+
print("This enhanced CompositeRawDecoder provides:")
168+
print("1. Auto-detection of GZIP content by magic bytes")
169+
print("2. Better error handling for UTF-8/GZIP issues")
170+
print("3. Graceful fallback and recovery mechanisms")
171+
print("4. Specific configuration for Bing Ads bulk streams")

0 commit comments

Comments
 (0)