diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 072a1efcd..642f7b5bb 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1549,7 +1549,6 @@ definitions: anyOf: - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/CompositeRawDecoder" $parameters: type: object additionalProperties: true @@ -2133,23 +2132,6 @@ definitions: $parameters: type: object additionalProperties: true - GzipJsonDecoder: - title: GzipJson Decoder - description: Use this if the response is Gzip compressed Json. - type: object - additionalProperties: true - required: - - type - properties: - type: - type: string - enum: [GzipJsonDecoder] - encoding: - type: string - default: utf-8 - $parameters: - type: object - additionalProperties: true ZipfileDecoder: title: Zipfile Decoder description: Decoder for response data that is returned as zipfile(s). @@ -2157,19 +2139,19 @@ definitions: additionalProperties: true required: - type - - parser + - decoder properties: type: type: string enum: [ZipfileDecoder] - parser: + decoder: title: Parser description: Parser to parse the decompressed data from the zipfile(s). anyOf: - - "$ref": "#/definitions/GzipParser" - - "$ref": "#/definitions/JsonParser" - - "$ref": "#/definitions/JsonLineParser" - - "$ref": "#/definitions/CsvParser" + - "$ref": "#/definitions/CsvDecoder" + - "$ref": "#/definitions/GzipDecoder" + - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonlDecoder" ListPartitionRouter: title: List Partition Router description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests. @@ -3002,79 +2984,39 @@ definitions: description: Component decoding the response so records can be extracted. anyOf: - "$ref": "#/definitions/CustomDecoder" + - "$ref": "#/definitions/CsvDecoder" + - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/GzipJsonDecoder" - - "$ref": "#/definitions/CompositeRawDecoder" - "$ref": "#/definitions/ZipfileDecoder" $parameters: type: object additionalProperties: true - CompositeRawDecoder: - description: "(This is experimental, use at your own risk)" - type: object - required: - - type - - parser - properties: - type: - type: string - enum: [CompositeRawDecoder] - parser: - anyOf: - - "$ref": "#/definitions/GzipParser" - - "$ref": "#/definitions/JsonParser" - - "$ref": "#/definitions/JsonLineParser" - - "$ref": "#/definitions/CsvParser" - # PARSERS - GzipParser: + GzipDecoder: type: object required: - type - - inner_parser + - decoder properties: type: type: string - enum: [GzipParser] - inner_parser: + enum: [GzipDecoder] + decoder: anyOf: - - "$ref": "#/definitions/JsonLineParser" - - "$ref": "#/definitions/CsvParser" - - "$ref": "#/definitions/JsonParser" - JsonParser: - title: JsonParser - description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format. - type: object - required: - - type - properties: - type: - type: string - enum: [JsonParser] - encoding: - type: string - default: utf-8 - JsonLineParser: - type: object - required: - - type - properties: - type: - type: string - enum: [JsonLineParser] - encoding: - type: string - default: utf-8 - CsvParser: + - "$ref": "#/definitions/CsvDecoder" + - "$ref": "#/definitions/GzipDecoder" + - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonlDecoder" + CsvDecoder: type: object required: - type properties: type: type: string - enum: [CsvParser] + enum: [CsvDecoder] encoding: type: string default: utf-8 @@ -3202,24 +3144,24 @@ definitions: description: Component decoding the response so records can be extracted. anyOf: - "$ref": "#/definitions/CustomDecoder" + - "$ref": "#/definitions/CsvDecoder" + - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/GzipJsonDecoder" - - "$ref": "#/definitions/CompositeRawDecoder" - "$ref": "#/definitions/ZipfileDecoder" download_decoder: title: Download Decoder description: Component decoding the download response so records can be extracted. anyOf: - "$ref": "#/definitions/CustomDecoder" + - "$ref": "#/definitions/CsvDecoder" + - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/GzipJsonDecoder" - - "$ref": "#/definitions/CompositeRawDecoder" - "$ref": "#/definitions/ZipfileDecoder" $parameters: type: object diff --git a/airbyte_cdk/sources/declarative/decoders/__init__.py b/airbyte_cdk/sources/declarative/decoders/__init__.py index 45eaf5599..cd91fe758 100644 --- a/airbyte_cdk/sources/declarative/decoders/__init__.py +++ b/airbyte_cdk/sources/declarative/decoders/__init__.py @@ -10,10 +10,8 @@ ) from airbyte_cdk.sources.declarative.decoders.decoder import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import ( - GzipJsonDecoder, IterableDecoder, JsonDecoder, - JsonlDecoder, ) from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import ( @@ -27,9 +25,7 @@ "CompositeRawDecoder", "JsonDecoder", "JsonParser", - "JsonlDecoder", "IterableDecoder", - "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder", diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 4d670db11..389679406 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -1,5 +1,6 @@ import csv import gzip +import io import json import logging from abc import ABC, abstractmethod @@ -130,11 +131,15 @@ class CompositeRawDecoder(Decoder): """ parser: Parser + stream_response: bool = True def is_stream_response(self) -> bool: - return True + return self.stream_response def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: - yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] + if self.is_stream_response(): + yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] + else: + yield from self.parser.parse(data=io.BytesIO(response.content)) diff --git a/airbyte_cdk/sources/declarative/decoders/json_decoder.py b/airbyte_cdk/sources/declarative/decoders/json_decoder.py index cab572ef4..3533fc5c8 100644 --- a/airbyte_cdk/sources/declarative/decoders/json_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/json_decoder.py @@ -10,21 +10,24 @@ import orjson import requests +from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder, JsonParser from airbyte_cdk.sources.declarative.decoders.decoder import Decoder logger = logging.getLogger("airbyte") -@dataclass class JsonDecoder(Decoder): """ Decoder strategy that returns the json-encoded content of a response, if any. + + Usually, we would try to instantiate the equivalent `CompositeRawDecoder(parser=JsonParser(), stream_response=False)` but there were specific historical behaviors related to the JsonDecoder that we didn't know if we could remove like the fallback on {} in case of errors. """ - parameters: InitVar[Mapping[str, Any]] + def __init__(self, parameters: Mapping[str, Any]): + self._decoder = CompositeRawDecoder(parser=JsonParser(), stream_response=False) def is_stream_response(self) -> bool: - return False + return self._decoder.is_stream_response() def decode( self, response: requests.Response @@ -32,25 +35,16 @@ def decode( """ Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping. """ + has_yielded = False try: - body_json = response.json() - yield from self.parse_body_json(body_json) - except requests.exceptions.JSONDecodeError: - logger.warning( - f"Response cannot be parsed into json: {response.status_code=}, {response.text=}" - ) + for element in self._decoder.decode(response): + yield element + has_yielded = True + except Exception: yield {} - @staticmethod - def parse_body_json( - body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]], - ) -> Generator[MutableMapping[str, Any], None, None]: - if not isinstance(body_json, list): - body_json = [body_json] - if len(body_json) == 0: + if not has_yielded: yield {} - else: - yield from body_json @dataclass @@ -69,43 +63,3 @@ def decode( ) -> Generator[MutableMapping[str, Any], None, None]: for line in response.iter_lines(): yield {"record": line.decode()} - - -@dataclass -class JsonlDecoder(Decoder): - """ - Decoder strategy that returns the json-encoded content of the response, if any. - """ - - parameters: InitVar[Mapping[str, Any]] - - def is_stream_response(self) -> bool: - return True - - def decode( - self, response: requests.Response - ) -> Generator[MutableMapping[str, Any], None, None]: - # TODO???: set delimiter? usually it is `\n` but maybe it would be useful to set optional? - # https://github.com/airbytehq/airbyte-internal-issues/issues/8436 - for record in response.iter_lines(): - yield orjson.loads(record) - - -@dataclass -class GzipJsonDecoder(JsonDecoder): - encoding: Optional[str] - - def __post_init__(self, parameters: Mapping[str, Any]) -> None: - if self.encoding: - try: - codecs.lookup(self.encoding) - except LookupError: - raise ValueError( - f"Invalid encoding '{self.encoding}'. Please check provided encoding" - ) - - def decode( - self, response: requests.Response - ) -> Generator[MutableMapping[str, Any], None, None]: - raw_string = decompress(response.content).decode(encoding=self.encoding or "utf-8") - yield from self.parse_body_json(orjson.loads(raw_string)) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index fe29cee2c..d2454ee78 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -887,15 +887,6 @@ class Config: parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class GzipJsonDecoder(BaseModel): - class Config: - extra = Extra.allow - - type: Literal["GzipJsonDecoder"] - encoding: Optional[str] = "utf-8" - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class MinMaxDatetime(BaseModel): type: Literal["MinMaxDatetime"] datetime: str = Field( @@ -1274,18 +1265,8 @@ class LegacySessionTokenAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class JsonParser(BaseModel): - type: Literal["JsonParser"] - encoding: Optional[str] = "utf-8" - - -class JsonLineParser(BaseModel): - type: Literal["JsonLineParser"] - encoding: Optional[str] = "utf-8" - - -class CsvParser(BaseModel): - type: Literal["CsvParser"] +class CsvDecoder(BaseModel): + type: Literal["CsvDecoder"] encoding: Optional[str] = "utf-8" delimiter: Optional[str] = "," @@ -1680,9 +1661,9 @@ class RecordSelector(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class GzipParser(BaseModel): - type: Literal["GzipParser"] - inner_parser: Union[JsonLineParser, CsvParser, JsonParser] +class GzipDecoder(BaseModel): + type: Literal["GzipDecoder"] + decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] class Spec(BaseModel): @@ -1720,18 +1701,13 @@ class Config: extra = Extra.allow type: Literal["ZipfileDecoder"] - parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser] = Field( + decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] = Field( ..., description="Parser to parse the decompressed data from the zipfile(s).", title="Parser", ) -class CompositeRawDecoder(BaseModel): - type: Literal["CompositeRawDecoder"] - parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser] - - class DeclarativeSource1(BaseModel): class Config: extra = Extra.forbid @@ -1933,7 +1909,7 @@ class SessionTokenAuthenticator(BaseModel): description="Authentication method to use for requests sent to the API, specifying how to inject the session token.", title="Data Request Authentication", ) - decoder: Optional[Union[JsonDecoder, XmlDecoder, CompositeRawDecoder]] = Field( + decoder: Optional[Union[JsonDecoder, XmlDecoder]] = Field( None, description="Component used to decode the response.", title="Decoder" ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2133,12 +2109,12 @@ class SimpleRetriever(BaseModel): decoder: Optional[ Union[ CustomDecoder, + CsvDecoder, + GzipDecoder, JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder, - GzipJsonDecoder, - CompositeRawDecoder, ZipfileDecoder, ] ] = Field( @@ -2211,12 +2187,12 @@ class AsyncRetriever(BaseModel): decoder: Optional[ Union[ CustomDecoder, + CsvDecoder, + GzipDecoder, JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder, - GzipJsonDecoder, - CompositeRawDecoder, ZipfileDecoder, ] ] = Field( @@ -2227,12 +2203,12 @@ class AsyncRetriever(BaseModel): download_decoder: Optional[ Union[ CustomDecoder, + CsvDecoder, + GzipDecoder, JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder, - GzipJsonDecoder, - CompositeRawDecoder, ZipfileDecoder, ] ] = Field( @@ -2277,6 +2253,7 @@ class DynamicDeclarativeStream(BaseModel): ComplexFieldType.update_forward_refs() +GzipDecoder.update_forward_refs() CompositeErrorHandler.update_forward_refs() DeclarativeSource1.update_forward_refs() DeclarativeSource2.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a664b8530..e7b2ac79d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -60,10 +60,8 @@ from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders import ( Decoder, - GzipJsonDecoder, IterableDecoder, JsonDecoder, - JsonlDecoder, PaginationDecoderDecorator, XmlDecoder, ZipfileDecoder, @@ -103,8 +101,8 @@ LegacyToPerPartitionStateMigration, ) from airbyte_cdk.sources.declarative.models import ( - Clamping, CustomStateMigration, + GzipDecoder, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( AddedFieldDefinition as AddedFieldDefinitionModel, @@ -142,9 +140,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CompositeErrorHandler as CompositeErrorHandlerModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - CompositeRawDecoder as CompositeRawDecoderModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConcurrencyLevel as ConcurrencyLevelModel, ) @@ -155,7 +150,7 @@ ConstantBackoffStrategy as ConstantBackoffStrategyModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - CsvParser as CsvParserModel, + CsvDecoder as CsvDecoderModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CursorPagination as CursorPaginationModel, @@ -230,10 +225,7 @@ FlattenFields as FlattenFieldsModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - GzipJsonDecoder as GzipJsonDecoderModel, -) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - GzipParser as GzipParserModel, + GzipDecoder as GzipDecoderModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpComponentsResolver as HttpComponentsResolverModel, @@ -259,12 +251,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonlDecoder as JsonlDecoderModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - JsonLineParser as JsonLineParserModel, -) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - JsonParser as JsonParserModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JwtAuthenticator as JwtAuthenticatorModel, ) @@ -531,9 +517,9 @@ def _init_mappings(self) -> None: CheckStreamModel: self.create_check_stream, CheckDynamicStreamModel: self.create_check_dynamic_stream, CompositeErrorHandlerModel: self.create_composite_error_handler, - CompositeRawDecoderModel: self.create_composite_raw_decoder, ConcurrencyLevelModel: self.create_concurrency_level, ConstantBackoffStrategyModel: self.create_constant_backoff_strategy, + CsvDecoderModel: self.create_csv_decoder, CursorPaginationModel: self.create_cursor_pagination, CustomAuthenticatorModel: self.create_custom_component, CustomBackoffStrategyModel: self.create_custom_component, @@ -563,10 +549,7 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, - JsonLineParserModel: self.create_json_line_parser, - JsonParserModel: self.create_json_parser, - GzipJsonDecoderModel: self.create_gzipjson_decoder, - GzipParserModel: self.create_gzip_parser, + GzipDecoderModel: self.create_gzip_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, KeysReplaceModel: self.create_keys_replace_transformation, @@ -2051,25 +2034,26 @@ def create_dynamic_schema_loader( ) @staticmethod - def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> JsonDecoder: + def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> Decoder: return JsonDecoder(parameters={}) @staticmethod - def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> JsonParser: - encoding = model.encoding if model.encoding else "utf-8" - return JsonParser(encoding=encoding) + def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder: + return CompositeRawDecoder( + parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + ) @staticmethod - def create_jsonl_decoder( - model: JsonlDecoderModel, config: Config, **kwargs: Any - ) -> JsonlDecoder: - return JsonlDecoder(parameters={}) + def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> Decoder: + return CompositeRawDecoder( + parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + ) @staticmethod - def create_json_line_parser( - model: JsonLineParserModel, config: Config, **kwargs: Any - ) -> JsonLineParser: - return JsonLineParser(encoding=model.encoding) + def create_gzip_decoder(model: GzipDecoderModel, config: Config, **kwargs: Any) -> Decoder: + return CompositeRawDecoder( + parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + ) @staticmethod def create_iterable_decoder( @@ -2081,33 +2065,30 @@ def create_iterable_decoder( def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> XmlDecoder: return XmlDecoder(parameters={}) - @staticmethod - def create_gzipjson_decoder( - model: GzipJsonDecoderModel, config: Config, **kwargs: Any - ) -> GzipJsonDecoder: - return GzipJsonDecoder(parameters={}, encoding=model.encoding) - def create_zipfile_decoder( self, model: ZipfileDecoderModel, config: Config, **kwargs: Any ) -> ZipfileDecoder: - parser = self._create_component_from_model(model=model.parser, config=config) - return ZipfileDecoder(parser=parser) - - def create_gzip_parser( - self, model: GzipParserModel, config: Config, **kwargs: Any - ) -> GzipParser: - inner_parser = self._create_component_from_model(model=model.inner_parser, config=config) - return GzipParser(inner_parser=inner_parser) + return ZipfileDecoder(parser=ModelToComponentFactory._get_parser(model.decoder, config)) @staticmethod - def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser: - return CsvParser(encoding=model.encoding, delimiter=model.delimiter) - - def create_composite_raw_decoder( - self, model: CompositeRawDecoderModel, config: Config, **kwargs: Any - ) -> CompositeRawDecoder: - parser = self._create_component_from_model(model=model.parser, config=config) - return CompositeRawDecoder(parser=parser) + def _get_parser(model: BaseModel, config: Config) -> Parser: + if isinstance(model, JsonDecoderModel): + # Note that the logic is a bit different from the JsonDecoder as there is some legacy that is maintained to return {} on error cases + return JsonParser() + elif isinstance(model, JsonlDecoderModel): + return JsonLineParser() + elif isinstance(model, CsvDecoderModel): + return CsvParser(encoding=model.encoding, delimiter=model.delimiter) + elif isinstance(model, GzipDecoderModel): + return GzipParser( + inner_parser=ModelToComponentFactory._get_parser(model.decoder, config) + ) + elif isinstance( + model, (CustomDecoderModel, IterableDecoderModel, XmlDecoderModel, ZipfileDecoderModel) + ): + raise ValueError(f"Decoder type {model} does not have parser associated to it") + + raise ValueError(f"Unknown decoder type {model}") @staticmethod def create_json_file_schema_loader( diff --git a/unit_tests/sources/declarative/auth/test_token_provider.py b/unit_tests/sources/declarative/auth/test_token_provider.py index 4dd10f116..2958cf04b 100644 --- a/unit_tests/sources/declarative/auth/test_token_provider.py +++ b/unit_tests/sources/declarative/auth/test_token_provider.py @@ -1,7 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import json from unittest.mock import MagicMock import freezegun @@ -13,13 +13,12 @@ SessionTokenProvider, ) from airbyte_cdk.sources.declarative.exceptions import ReadException -from airbyte_cdk.utils.datetime_helpers import ab_datetime_now def create_session_token_provider(): login_requester = MagicMock() login_response = MagicMock() - login_response.json.return_value = {"nested": {"token": "my_token"}} + login_response.content = json.dumps({"nested": {"token": "my_token"}}).encode() login_requester.send_request.return_value = login_response return SessionTokenProvider( @@ -56,9 +55,9 @@ def test_session_token_provider_cache_expiration(): provider = create_session_token_provider() provider.get_token() - provider.login_requester.send_request.return_value.json.return_value = { - "nested": {"token": "updated_token"} - } + provider.login_requester.send_request.return_value.content = json.dumps( + {"nested": {"token": "updated_token"}} + ).encode() with freezegun.freeze_time("2001-05-21T14:00:00Z"): assert provider.get_token() == "updated_token" diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 9da1b8148..524593b56 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -198,3 +198,34 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d parsed_records = list(composite_raw_decoder.decode(response)) assert parsed_records == expected_data + + +def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock): + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode() + ) + response = requests.get("https://airbyte.io/", stream=True) + composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding="utf-8")) + + content = list(composite_raw_decoder.decode(response)) + assert content + + with pytest.raises(Exception): + list(composite_raw_decoder.decode(response)) + + +def test_given_response_is_not_streamed_when_decode_then_can_be_called_multiple_times( + requests_mock, +): + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode() + ) + response = requests.get("https://airbyte.io/") + composite_raw_decoder = CompositeRawDecoder( + parser=JsonParser(encoding="utf-8"), stream_response=False + ) + + content = list(composite_raw_decoder.decode(response)) + content_second_time = list(composite_raw_decoder.decode(response)) + + assert content == content_second_time diff --git a/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py index e0cbe4c8c..241b45822 100644 --- a/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py +++ b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py @@ -10,8 +10,6 @@ from airbyte_cdk import YamlDeclarativeSource from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.declarative.decoders import GzipJsonDecoder -from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, @@ -38,10 +36,6 @@ def large_event_response_fixture(): "decoder_yaml_definition", [ "type: JsonlDecoder", - """type: CompositeRawDecoder - parser: - type: JsonLineParser - """, ], ) def test_jsonl_decoder_memory_usage( diff --git a/unit_tests/sources/declarative/decoders/test_json_decoder.py b/unit_tests/sources/declarative/decoders/test_json_decoder.py index 087619dc9..c78d157ab 100644 --- a/unit_tests/sources/declarative/decoders/test_json_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_json_decoder.py @@ -8,8 +8,9 @@ import pytest import requests -from airbyte_cdk.sources.declarative.decoders import GzipJsonDecoder -from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder +from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import JsonLineParser +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder @pytest.mark.parametrize( @@ -40,8 +41,11 @@ def test_json_decoder(requests_mock, response_body, first_element): ) def test_jsonl_decoder(requests_mock, response_body, expected_json): requests_mock.register_uri("GET", "https://airbyte.io/", text=response_body) - response = requests.get("https://airbyte.io/") - assert list(JsonlDecoder(parameters={}).decode(response)) == expected_json + response = requests.get("https://airbyte.io/", stream=True) + assert ( + list(CompositeRawDecoder(parser=JsonLineParser(), stream_response=True).decode(response)) + == expected_json + ) @pytest.fixture(name="large_events_response") @@ -56,78 +60,3 @@ def large_event_response_fixture(): file.write(jsonl_string) yield (lines_in_response, file_path) os.remove(file_path) - - -@pytest.mark.parametrize( - "encoding", - [ - "utf-8", - "utf", - ], - ids=["utf-8", "utf"], -) -def test_gzipjson_decoder(requests_mock, encoding): - response_to_compress = json.dumps( - [ - { - "campaignId": 214078428, - "campaignName": "sample-campaign-name-214078428", - "adGroupId": "6490134", - "adId": "665320125", - "targetId": "791320341", - "asin": "G000PSH142", - "advertisedAsin": "G000PSH142", - "keywordBid": "511234974", - "keywordId": "965783021", - }, - { - "campaignId": 44504582, - "campaignName": "sample-campaign-name-44504582", - "adGroupId": "6490134", - "adId": "665320125", - "targetId": "791320341", - "asin": "G000PSH142", - "advertisedAsin": "G000PSH142", - "keywordBid": "511234974", - "keywordId": "965783021", - }, - { - "campaignId": 509144838, - "campaignName": "sample-campaign-name-509144838", - "adGroupId": "6490134", - "adId": "665320125", - "targetId": "791320341", - "asin": "G000PSH142", - "advertisedAsin": "G000PSH142", - "keywordBid": "511234974", - "keywordId": "965783021", - }, - { - "campaignId": 231712082, - "campaignName": "sample-campaign-name-231712082", - "adGroupId": "6490134", - "adId": "665320125", - "targetId": "791320341", - "asin": "G000PSH142", - "advertisedAsin": "G000PSH142", - "keywordBid": "511234974", - "keywordId": "965783021", - }, - { - "campaignId": 895306040, - "campaignName": "sample-campaign-name-895306040", - "adGroupId": "6490134", - "adId": "665320125", - "targetId": "791320341", - "asin": "G000PSH142", - "advertisedAsin": "G000PSH142", - "keywordBid": "511234974", - "keywordId": "965783021", - }, - ] - ) - body = gzip.compress(response_to_compress.encode(encoding)) - - requests_mock.register_uri("GET", "https://airbyte.io/", content=body) - response = requests.get("https://airbyte.io/") - assert len(list(GzipJsonDecoder(parameters={}, encoding=encoding).decode(response))) == 5 diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index 0d4d8a529..fa216685a 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -9,10 +9,11 @@ import requests from airbyte_cdk import Decoder +from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import JsonLineParser from airbyte_cdk.sources.declarative.decoders.json_decoder import ( IterableDecoder, JsonDecoder, - JsonlDecoder, ) from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor @@ -20,7 +21,7 @@ parameters = {"parameters_field": "record_array"} decoder_json = JsonDecoder(parameters={}) -decoder_jsonl = JsonlDecoder(parameters={}) +decoder_jsonl = CompositeRawDecoder(parser=JsonLineParser(), stream_response=True) decoder_iterable = IterableDecoder(parameters={})