diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 70c417054..a4fed9754 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -128,7 +128,7 @@ def __init__( component_factory if component_factory else ModelToComponentFactory( - emit_connector_builder_messages, + emit_connector_builder_messages=emit_connector_builder_messages, max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), ) ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 3dd5e1b00..4585ab47f 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -510,7 +510,6 @@ AsyncRetriever, LazySimpleRetriever, SimpleRetriever, - SimpleRetrieverTestReadDecorator, ) from airbyte_cdk.sources.declarative.retrievers.file_uploader import ( ConnectorBuilderFileUploader, @@ -530,7 +529,10 @@ ) from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader from airbyte_cdk.sources.declarative.spec import ConfigMigration, Spec -from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer +from airbyte_cdk.sources.declarative.stream_slicers import ( + StreamSlicer, + StreamSlicerTestReadDecorator, +) from airbyte_cdk.sources.declarative.transformations import ( AddFields, RecordTransformation, @@ -3241,6 +3243,14 @@ def _get_url() -> str: request_options_provider = DefaultRequestOptionsProvider(parameters={}) stream_slicer = stream_slicer or SinglePartitionRouter(parameters={}) + if self._should_limit_slices_fetched(): + stream_slicer = cast( + StreamSlicer, + StreamSlicerTestReadDecorator( + wrapped_slicer=stream_slicer, + maximum_number_of_slices=self._limit_slices_fetched or 5, + ), + ) cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None paginator = ( @@ -3299,22 +3309,6 @@ def _get_url() -> str: parameters=model.parameters or {}, ) - if self._limit_slices_fetched or self._emit_connector_builder_messages: - return SimpleRetrieverTestReadDecorator( - name=name, - paginator=paginator, - primary_key=primary_key, - requester=requester, - record_selector=record_selector, - stream_slicer=stream_slicer, - request_option_provider=request_options_provider, - cursor=cursor, - config=config, - maximum_number_of_slices=self._limit_slices_fetched or 5, - ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, - log_formatter=log_formatter, - parameters=model.parameters or {}, - ) return SimpleRetriever( name=name, paginator=paginator, @@ -3327,9 +3321,35 @@ def _get_url() -> str: config=config, ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, additional_query_properties=query_properties, + log_formatter=self._get_log_formatter(log_formatter, name), parameters=model.parameters or {}, ) + def _get_log_formatter( + self, log_formatter: Callable[[Response], Any] | None, name: str + ) -> Callable[[Response], Any] | None: + if self._should_limit_slices_fetched(): + return ( + ( + lambda response: format_http_message( + response, + f"Stream '{name}' request", + f"Request performed in order to extract records for stream '{name}'", + name, + ) + ) + if not log_formatter + else log_formatter + ) + return None + + def _should_limit_slices_fetched(self) -> bool: + """ + Returns True if the number of slices fetched should be limited, False otherwise. + This is used to limit the number of slices fetched during tests. + """ + return bool(self._limit_slices_fetched or self._emit_connector_builder_messages) + @staticmethod def _query_properties_in_request_parameters( requester: Union[HttpRequesterModel, CustomRequesterModel], @@ -3420,7 +3440,7 @@ def create_async_retriever( transformations: List[RecordTransformation], **kwargs: Any, ) -> AsyncRetriever: - def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetriever: + def _get_download_retriever() -> SimpleRetriever: record_selector = RecordSelector( extractor=download_extractor, name=name, @@ -3440,19 +3460,6 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie if model.download_paginator else NoPagination(parameters={}) ) - maximum_number_of_slices = self._limit_slices_fetched or 5 - - if self._limit_slices_fetched or self._emit_connector_builder_messages: - return SimpleRetrieverTestReadDecorator( - requester=download_requester, - record_selector=record_selector, - primary_key=None, - name=job_download_components_name, - paginator=paginator, - config=config, - parameters={}, - maximum_number_of_slices=maximum_number_of_slices, - ) return SimpleRetriever( requester=download_requester, @@ -3498,7 +3505,17 @@ def _get_job_timeout() -> datetime.timedelta: transformations=transformations, client_side_incremental_sync=client_side_incremental_sync, ) + stream_slicer = stream_slicer or SinglePartitionRouter(parameters={}) + if self._should_limit_slices_fetched(): + stream_slicer = cast( + StreamSlicer, + StreamSlicerTestReadDecorator( + wrapped_slicer=stream_slicer, + maximum_number_of_slices=self._limit_slices_fetched or 5, + ), + ) + creation_requester = self._create_component_from_model( model=model.creation_requester, decoder=decoder, diff --git a/airbyte_cdk/sources/declarative/retrievers/__init__.py b/airbyte_cdk/sources/declarative/retrievers/__init__.py index 5b26220e0..7349efcd5 100644 --- a/airbyte_cdk/sources/declarative/retrievers/__init__.py +++ b/airbyte_cdk/sources/declarative/retrievers/__init__.py @@ -7,13 +7,11 @@ from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( LazySimpleRetriever, SimpleRetriever, - SimpleRetrieverTestReadDecorator, ) __all__ = [ "Retriever", "SimpleRetriever", - "SimpleRetrieverTestReadDecorator", "AsyncRetriever", "LazySimpleRetriever", ] diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 5bd68e5bd..4ee2422cd 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -645,40 +645,6 @@ def _deep_merge( target[key] = value -@dataclass -class SimpleRetrieverTestReadDecorator(SimpleRetriever): - """ - In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of - slices that are queried throughout a read command. - """ - - maximum_number_of_slices: int = 5 - - def __post_init__(self, options: Mapping[str, Any]) -> None: - super().__post_init__(options) - self.log_formatter = ( - ( - lambda response: format_http_message( - response, - f"Stream '{self.name}' request", - f"Request performed in order to extract records for stream '{self.name}'", - self.name, - ) - ) - if not self.log_formatter - else self.log_formatter - ) - - if self.maximum_number_of_slices and self.maximum_number_of_slices < 1: - raise ValueError( - f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}" - ) - - # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever - def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore - return islice(super().stream_slices(), self.maximum_number_of_slices) - - @deprecated( "This class is experimental. Use at your own risk.", category=ExperimentalClassWarning, diff --git a/airbyte_cdk/sources/declarative/stream_slicers/__init__.py b/airbyte_cdk/sources/declarative/stream_slicers/__init__.py index 7bacc3ca8..10879dcf6 100644 --- a/airbyte_cdk/sources/declarative/stream_slicers/__init__.py +++ b/airbyte_cdk/sources/declarative/stream_slicers/__init__.py @@ -3,5 +3,8 @@ # from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer_test_read_decorator import ( + StreamSlicerTestReadDecorator, +) -__all__ = ["StreamSlicer"] +__all__ = ["StreamSlicer", "StreamSlicerTestReadDecorator"] diff --git a/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py b/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py new file mode 100644 index 000000000..323c89196 --- /dev/null +++ b/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py @@ -0,0 +1,28 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass +from itertools import islice +from typing import Any, Iterable, Mapping, Optional, Union + +from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer +from airbyte_cdk.sources.types import StreamSlice, StreamState + + +@dataclass +class StreamSlicerTestReadDecorator(StreamSlicer): + """ + In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of + slices that are queried throughout a read command. + """ + + wrapped_slicer: StreamSlicer + maximum_number_of_slices: int = 5 + + def stream_slices(self) -> Iterable[StreamSlice]: + return islice(self.wrapped_slicer.stream_slices(), self.maximum_number_of_slices) + + def __getattr__(self, name: str) -> Any: + # Delegate everything else to the wrapped object + return getattr(self.wrapped_slicer, name) diff --git a/airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py b/airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py index 98ac04ed7..c4e07622b 100644 --- a/airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py +++ b/airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py @@ -1,12 +1,30 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. -from abc import ABC, abstractmethod -from typing import Iterable +from abc import ABC, ABCMeta, abstractmethod +from typing import Any, Iterable from airbyte_cdk.sources.types import StreamSlice -class StreamSlicer(ABC): +class StreamSlicerMeta(ABCMeta): + """ + Metaclass for wrapper scenario that allows it to be used as a type check for StreamSlicer. + This is necessary because StreamSlicerTestReadDecorator wraps a StreamSlicer and we want to be able to check + if an instance is a StreamSlicer, even if it is wrapped in a StreamSlicerTestReadDecorator. + + For example in ConcurrentDeclarativeSource, we do things like: + isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor)) + """ + + def __instancecheck__(cls, instance: Any) -> bool: + # Check if it's our wrapper with matching wrapped class + if hasattr(instance, "wrapped_slicer"): + return isinstance(instance.wrapped_slicer, cls) + + return super().__instancecheck__(instance) + + +class StreamSlicer(ABC, metaclass=StreamSlicerMeta): """ Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization. """ diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index ba19126de..cd0f8f9b1 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -58,8 +58,8 @@ from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource -from airbyte_cdk.sources.declarative.retrievers import SimpleRetrieverTestReadDecorator from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever +from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets, update_secrets from unit_tests.connector_builder.utils import create_configured_catalog @@ -1112,7 +1112,8 @@ def test_read_source(mock_http_stream): streams = source.streams(config) for s in streams: - assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator) + assert isinstance(s.retriever, SimpleRetriever) + assert isinstance(s.retriever.stream_slicer, StreamSlicerTestReadDecorator) @patch.object( @@ -1158,7 +1159,8 @@ def test_read_source_single_page_single_slice(mock_http_stream): streams = source.streams(config) for s in streams: - assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator) + assert isinstance(s.retriever, SimpleRetriever) + assert isinstance(s.retriever.stream_slicer, StreamSlicerTestReadDecorator) @pytest.mark.parametrize( diff --git a/unit_tests/connector_builder/test_property_chunking.py b/unit_tests/connector_builder/test_property_chunking.py new file mode 100644 index 000000000..9998c71fd --- /dev/null +++ b/unit_tests/connector_builder/test_property_chunking.py @@ -0,0 +1,243 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +import copy +import json + +import freezegun + +from airbyte_cdk.connector_builder.connector_builder_handler import ( + TestLimits, +) +from airbyte_cdk.connector_builder.main import ( + handle_connector_builder_request, +) +from airbyte_cdk.models import ( + AirbyteStateBlob, + AirbyteStateMessage, + AirbyteStreamState, + ConfiguredAirbyteCatalogSerializer, + Level, + StreamDescriptor, +) +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import find_template + +BASE_URL = "https://api.apilayer.com/exchangerates_data/" +FREEZE_DATE = "2025-05-23" + +PROPERTY_KEY = "test" +PROPERTY_LIST = ["one", "two", "three", "four"] + +MANIFEST = { + "version": "6.48.15", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Rates", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "path": "/exchangerates_data/{{stream_interval.start_time}}", + "url_base": "https://api.apilayer.com", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "api_token": "{{ config['api_key'] }}", + "inject_into": { + "type": "RequestOption", + "field_name": "apikey", + "inject_into": "header", + }, + }, + "request_parameters": { + "base": "{{ config['base'] }}", + PROPERTY_KEY: { + "type": "QueryProperties", + "property_list": PROPERTY_LIST, + "property_chunking": { + "type": "PropertyChunking", + "property_limit_type": "property_count", + "property_limit": 2, + }, + }, + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/schema#", + "properties": { + "base": {"type": "string"}, + "date": {"type": "string"}, + "rates": { + "type": "object", + "properties": {"fake_currency": {"type": "number"}}, + }, + "success": {"type": "boolean"}, + "timestamp": {"type": "number"}, + "historical": {"type": "boolean"}, + }, + }, + }, + "transformations": [], + "incremental_sync": { + "type": "DatetimeBasedCursor", + "step": "P1D", + "cursor_field": "date", + "end_datetime": { + "type": "MinMaxDatetime", + "datetime": "{{ now_utc().strftime('%Y-%m-%dT%H:%M:%SZ') }}", + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + }, + "start_datetime": { + "type": "MinMaxDatetime", + "datetime": "{{ config['start_date'] }}", + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + }, + "datetime_format": "%Y-%m-%d", + "cursor_granularity": "P1D", + "cursor_datetime_formats": ["%Y-%m-%d"], + }, + "state_migrations": [], + } + ], + "spec": { + "type": "Spec", + "documentation_url": "https://example.org", + "connection_specification": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "required": ["start_date", "api_key", "base"], + "properties": { + "base": {"type": "string", "order": 2, "title": "Base"}, + "api_key": { + "type": "string", + "order": 1, + "title": "API Key", + "airbyte_secret": True, + }, + "start_date": { + "type": "string", + "order": 0, + "title": "Start date", + "format": "date-time", + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", + }, + }, + "additionalProperties": True, + }, + }, + "metadata": { + "testedStreams": { + "Rates": { + "hasRecords": False, + "streamHash": "4dce031d602258dd3bcc478731d6862a5cdeb70f", + "hasResponse": False, + "primaryKeysAreUnique": False, + "primaryKeysArePresent": False, + "responsesAreSuccessful": False, + } + }, + "autoImportSchema": {"Rates": True}, + }, + "dynamic_streams": [], +} + +_stream_name = "Rates" + +_A_STATE = [ + AirbyteStateMessage( + type="STREAM", + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name=_stream_name), + stream_state=AirbyteStateBlob({"key": "value"}), + ), + ) +] + +TEST_READ_CONFIG = { + "__injected_declarative_manifest": MANIFEST, + "__command": "test_read", + "__test_read_config": {"max_pages_per_slice": 2, "max_slices": 5, "max_records": 10}, +} + +CONFIGURED_CATALOG = { + "streams": [ + { + "stream": { + "name": _stream_name, + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {}, + }, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": False, + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + } + ] +} + + +@freezegun.freeze_time(f"{FREEZE_DATE}T00:00:00Z") +def test_read(): + conversion_base = "USD" + config = copy.deepcopy(TEST_READ_CONFIG) + config["start_date"] = f"{FREEZE_DATE}T00:00:00Z" + config["base"] = conversion_base + config["api_key"] = "test_api_key" + + stream_url = f"{BASE_URL}{FREEZE_DATE}?base={conversion_base}&{PROPERTY_KEY}=" + + with HttpMocker() as http_mocker: + source = ManifestDeclarativeSource( + source_config=MANIFEST, emit_connector_builder_messages=True + ) + limits = TestLimits() + + http_mocker.get( + HttpRequest(url=f"{stream_url}{PROPERTY_LIST[0]}%2C{PROPERTY_LIST[1]}"), + HttpResponse( + json.dumps(find_template("declarative/property_chunking/rates_one_two", __file__)), + 200, + ), + ) + http_mocker.get( + HttpRequest(url=f"{stream_url}{PROPERTY_LIST[2]}%2C{PROPERTY_LIST[3]}"), + HttpResponse( + json.dumps( + find_template("declarative/property_chunking/rates_three_four", __file__) + ), + 200, + ), + ) + output_record = handle_connector_builder_request( + source, + "test_read", + config, + ConfiguredAirbyteCatalogSerializer.load(CONFIGURED_CATALOG), + _A_STATE, + limits, + ) + # for connector build we get each record in a single page + assert len(output_record.record.data["slices"][0]["pages"]) == 2 + for current_log in output_record.record.data["logs"]: + assert not "Something went wrong in the connector" in current_log["message"] + assert not current_log["internal_message"] + assert not current_log["level"] == Level.ERROR + assert not current_log["stacktrace"] diff --git a/unit_tests/resource/http/response/declarative/property_chunking/rates_one_two.json b/unit_tests/resource/http/response/declarative/property_chunking/rates_one_two.json new file mode 100644 index 000000000..4fa000473 --- /dev/null +++ b/unit_tests/resource/http/response/declarative/property_chunking/rates_one_two.json @@ -0,0 +1,10 @@ +{ + "success": true, + "timestamp": 1641081599, + "historical": true, + "base": "USD", + "date": "2022-01-01", + "rates": { + "fake_currency": 1.2345 + } +} \ No newline at end of file diff --git a/unit_tests/resource/http/response/declarative/property_chunking/rates_three_four.json b/unit_tests/resource/http/response/declarative/property_chunking/rates_three_four.json new file mode 100644 index 000000000..0a121f492 --- /dev/null +++ b/unit_tests/resource/http/response/declarative/property_chunking/rates_three_four.json @@ -0,0 +1,10 @@ +{ + "success": true, + "timestamp": 1641167999, + "historical": true, + "base": "USD", + "date": "2022-01-02", + "rates": { + "fake_currency": 1.2345 + } +} \ No newline at end of file diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 0b97ee453..9231ae26b 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -151,15 +151,12 @@ ) from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod -from airbyte_cdk.sources.declarative.retrievers import ( - AsyncRetriever, - SimpleRetriever, - SimpleRetrieverTestReadDecorator, -) +from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader, JsonFileSchemaLoader from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader from airbyte_cdk.sources.declarative.spec import Spec +from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource @@ -2715,6 +2712,13 @@ def test_simple_retriever_emit_log_messages(): "path": "/v1/api", }, } + request = requests.PreparedRequest() + request.headers = {"header": "value"} + request.url = "http://byrde.enterprises.com/casinos" + + response = requests.Response() + response.request = request + response.status_code = 200 connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) retriever = connector_builder_factory.create_component( @@ -2727,8 +2731,13 @@ def test_simple_retriever_emit_log_messages(): transformations=[], ) - assert isinstance(retriever, SimpleRetrieverTestReadDecorator) + assert isinstance(retriever, SimpleRetriever) assert connector_builder_factory._message_repository._log_level == Level.DEBUG + assert retriever.log_formatter is not None + assert retriever.log_formatter(response) == connector_builder_factory._get_log_formatter( + None, retriever.name + )(response) + assert isinstance(retriever.stream_slicer, StreamSlicerTestReadDecorator) def test_create_page_increment(): @@ -3078,7 +3087,8 @@ def test_use_request_options_provider_for_datetime_based_cursor(): assert retriever.name == "Test" assert isinstance(retriever.cursor, DatetimeBasedCursor) - assert isinstance(retriever.stream_slicer, DatetimeBasedCursor) + assert isinstance(retriever.stream_slicer, StreamSlicerTestReadDecorator) + assert isinstance(retriever.stream_slicer.wrapped_slicer, DatetimeBasedCursor) assert isinstance(retriever.request_option_provider, DatetimeBasedRequestOptionsProvider) assert ( @@ -3166,7 +3176,8 @@ def test_do_not_separate_request_options_provider_for_non_datetime_based_cursor( assert retriever.name == "Test" assert isinstance(retriever.cursor, PerPartitionCursor) - assert isinstance(retriever.stream_slicer, PerPartitionCursor) + assert isinstance(retriever.stream_slicer, StreamSlicerTestReadDecorator) + assert isinstance(retriever.stream_slicer.wrapped_slicer, PerPartitionCursor) assert isinstance(retriever.request_option_provider, PerPartitionCursor) assert isinstance(retriever.request_option_provider._cursor_factory, CursorFactory) @@ -3207,7 +3218,8 @@ def test_use_default_request_options_provider(): assert retriever.primary_key == "id" assert retriever.name == "Test" - assert isinstance(retriever.stream_slicer, SinglePartitionRouter) + assert isinstance(retriever.stream_slicer, StreamSlicerTestReadDecorator) + assert isinstance(retriever.stream_slicer.wrapped_slicer, SinglePartitionRouter) assert isinstance(retriever.request_option_provider, DefaultRequestOptionsProvider) diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 76a94aec3..0f9cc66ff 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -40,10 +40,8 @@ ) from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod -from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( - SimpleRetriever, - SimpleRetrieverTestReadDecorator, -) +from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever +from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator from airbyte_cdk.sources.types import Record, StreamSlice from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer @@ -704,14 +702,17 @@ def test_limit_stream_slices(): maximum_number_of_slices = 4 stream_slicer = MagicMock() stream_slicer.stream_slices.return_value = _generate_slices(maximum_number_of_slices * 2) - retriever = SimpleRetrieverTestReadDecorator( + stream_slicer_wrapped = StreamSlicerTestReadDecorator( + wrapped_slicer=stream_slicer, + maximum_number_of_slices=maximum_number_of_slices, + ) + retriever = SimpleRetriever( name="stream_name", primary_key=primary_key, requester=MagicMock(), paginator=MagicMock(), record_selector=MagicMock(), - stream_slicer=stream_slicer, - maximum_number_of_slices=maximum_number_of_slices, + stream_slicer=stream_slicer_wrapped, parameters={}, config={}, ) @@ -876,55 +877,6 @@ def test_given_state_selector_when_read_records_use_stream_state(http_stream_rea http_stream_read_pages.assert_called_once_with(mocker.ANY, A_STREAM_STATE, A_STREAM_SLICE) -def test_emit_log_request_response_messages(mocker): - record_selector = MagicMock() - record_selector.select_records.return_value = records - - request = requests.PreparedRequest() - request.headers = {"header": "value"} - request.url = "http://byrde.enterprises.com/casinos" - - response = requests.Response() - response.request = request - response.status_code = 200 - - format_http_message_mock = mocker.patch( - "airbyte_cdk.sources.declarative.retrievers.simple_retriever.format_http_message" - ) - requester = MagicMock() - - # Add __name__ to mock methods - requester.get_request_params.__name__ = "get_request_params" - requester.get_request_headers.__name__ = "get_request_headers" - requester.get_request_body_data.__name__ = "get_request_body_data" - requester.get_request_body_json.__name__ = "get_request_body_json" - - # The paginator mock also needs __name__ attributes - paginator = MagicMock() - paginator.get_request_params.__name__ = "get_request_params" - paginator.get_request_headers.__name__ = "get_request_headers" - paginator.get_request_body_data.__name__ = "get_request_body_data" - paginator.get_request_body_json.__name__ = "get_request_body_json" - - retriever = SimpleRetrieverTestReadDecorator( - name="stream_name", - primary_key=primary_key, - requester=requester, - paginator=paginator, - record_selector=record_selector, - stream_slicer=SinglePartitionRouter(parameters={}), - parameters={}, - config={}, - ) - - retriever._fetch_next_page( - stream_state={}, stream_slice=StreamSlice(cursor_slice={}, partition={}) - ) - - assert retriever.log_formatter is not None - assert retriever.log_formatter(response) == format_http_message_mock.return_value - - def test_retriever_last_page_size_for_page_increment(): requester = MagicMock() requester.send_request.return_value = MagicMock() diff --git a/unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py b/unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py new file mode 100644 index 000000000..05edf31cf --- /dev/null +++ b/unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py @@ -0,0 +1,269 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import Mock + +from airbyte_cdk.sources.declarative.async_job.job_orchestrator import ( + AsyncJobOrchestrator, +) +from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker +from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime +from airbyte_cdk.sources.declarative.incremental import ( + CursorFactory, + DatetimeBasedCursor, + GlobalSubstreamCursor, + PerPartitionWithGlobalCursor, +) +from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor +from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import ( + StreamSlice, +) +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.models import ( + CustomRetriever, + DeclarativeStream, + ParentStreamConfig, +) +from airbyte_cdk.sources.declarative.partition_routers import ( + AsyncJobPartitionRouter, + SubstreamPartitionRouter, +) +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter +from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( + SinglePartitionRouter, +) +from airbyte_cdk.sources.declarative.stream_slicers import ( + StreamSlicer, + StreamSlicerTestReadDecorator, +) +from airbyte_cdk.sources.message import NoopMessageRepository +from unit_tests.sources.declarative.async_job.test_integration import MockAsyncJobRepository + +CURSOR_SLICE_FIELD = "cursor slice field" +_NO_LIMIT = 10000 +DATE_FORMAT = "%Y-%m-%d" + + +class MockedCursorBuilder: + def __init__(self): + self._stream_slices = [] + self._stream_state = {} + + def with_stream_slices(self, stream_slices): + self._stream_slices = stream_slices + return self + + def with_stream_state(self, stream_state): + self._stream_state = stream_state + return self + + def build(self): + cursor = Mock(spec=DeclarativeCursor) + cursor.get_stream_state.return_value = self._stream_state + cursor.stream_slices.return_value = self._stream_slices + return cursor + + +def mocked_partition_router(): + return Mock(spec=PartitionRouter) + + +def date_time_based_cursor_factory() -> DatetimeBasedCursor: + return DatetimeBasedCursor( + start_datetime=MinMaxDatetime( + datetime="2021-01-01", datetime_format=DATE_FORMAT, parameters={} + ), + end_datetime=MinMaxDatetime( + datetime="2021-01-05", datetime_format=DATE_FORMAT, parameters={} + ), + step="P10Y", + cursor_field=InterpolatedString.create("created_at", parameters={}), + datetime_format=DATE_FORMAT, + cursor_granularity="P1D", + config={}, + parameters={}, + ) + + +def create_substream_partition_router(): + return SubstreamPartitionRouter( + config={}, + parameters={}, + parent_stream_configs=[ + ParentStreamConfig( + type="ParentStreamConfig", + parent_key="id", + partition_field="id", + stream=DeclarativeStream( + type="DeclarativeStream", + retriever=CustomRetriever(type="CustomRetriever", class_name="a_class_name"), + ), + ) + ], + ) + + +def test_isinstance_global_cursor(): + first_partition = {"first_partition_key": "first_partition_value"} + partition_router = mocked_partition_router() + partition_router.stream_slices.return_value = [ + StreamSlice( + partition=first_partition, cursor_slice={}, extra_fields={"extra_field": "extra_value"} + ), + ] + cursor = ( + MockedCursorBuilder() + .with_stream_slices([{CURSOR_SLICE_FIELD: "first slice cursor value"}]) + .build() + ) + + global_cursor = GlobalSubstreamCursor(cursor, partition_router) + wrapped_slicer = StreamSlicerTestReadDecorator( + wrapped_slicer=global_cursor, + maximum_number_of_slices=5, + ) + assert isinstance(wrapped_slicer, GlobalSubstreamCursor) + assert isinstance(wrapped_slicer.wrapped_slicer, GlobalSubstreamCursor) + assert isinstance(wrapped_slicer, StreamSlicerTestReadDecorator) + + assert not isinstance(wrapped_slicer.wrapped_slicer, StreamSlicerTestReadDecorator) + assert not isinstance(wrapped_slicer, AsyncJobPartitionRouter) + assert not isinstance(wrapped_slicer.wrapped_slicer, AsyncJobPartitionRouter) + assert not isinstance(wrapped_slicer, PerPartitionWithGlobalCursor) + assert not isinstance(wrapped_slicer.wrapped_slicer, PerPartitionWithGlobalCursor) + assert not isinstance(wrapped_slicer, SubstreamPartitionRouter) + assert not isinstance(wrapped_slicer.wrapped_slicer, SubstreamPartitionRouter) + + assert isinstance(global_cursor, GlobalSubstreamCursor) + assert not isinstance(global_cursor, StreamSlicerTestReadDecorator) + assert not isinstance(global_cursor, AsyncJobPartitionRouter) + assert not isinstance(global_cursor, PerPartitionWithGlobalCursor) + assert not isinstance(global_cursor, SubstreamPartitionRouter) + + +def test_isinstance_global_cursor_aysnc_job_partition_router(): + async_job_partition_router = AsyncJobPartitionRouter( + stream_slicer=SinglePartitionRouter(parameters={}), + job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( + MockAsyncJobRepository(), + stream_slices, + JobTracker(_NO_LIMIT), + NoopMessageRepository(), + ), + config={}, + parameters={}, + ) + + wrapped_slicer = StreamSlicerTestReadDecorator( + wrapped_slicer=async_job_partition_router, + maximum_number_of_slices=5, + ) + assert isinstance(wrapped_slicer, AsyncJobPartitionRouter) + assert isinstance(wrapped_slicer.wrapped_slicer, AsyncJobPartitionRouter) + assert isinstance(wrapped_slicer, StreamSlicerTestReadDecorator) + + assert not isinstance(wrapped_slicer.wrapped_slicer, StreamSlicerTestReadDecorator) + assert not isinstance(wrapped_slicer, GlobalSubstreamCursor) + assert not isinstance(wrapped_slicer.wrapped_slicer, GlobalSubstreamCursor) + assert not isinstance(wrapped_slicer, PerPartitionWithGlobalCursor) + assert not isinstance(wrapped_slicer.wrapped_slicer, PerPartitionWithGlobalCursor) + assert not isinstance(wrapped_slicer, SubstreamPartitionRouter) + assert not isinstance(wrapped_slicer.wrapped_slicer, SubstreamPartitionRouter) + + assert isinstance(async_job_partition_router, AsyncJobPartitionRouter) + assert not isinstance(async_job_partition_router, StreamSlicerTestReadDecorator) + assert not isinstance(async_job_partition_router, GlobalSubstreamCursor) + assert not isinstance(async_job_partition_router, PerPartitionWithGlobalCursor) + assert not isinstance(async_job_partition_router, SubstreamPartitionRouter) + + +def test_isinstance_substream_partition_router(): + partition_router = create_substream_partition_router() + + wrapped_slicer = StreamSlicerTestReadDecorator( + wrapped_slicer=partition_router, + maximum_number_of_slices=5, + ) + + assert isinstance(wrapped_slicer, SubstreamPartitionRouter) + assert isinstance(wrapped_slicer.wrapped_slicer, SubstreamPartitionRouter) + assert isinstance(wrapped_slicer, StreamSlicerTestReadDecorator) + + assert not isinstance(wrapped_slicer.wrapped_slicer, StreamSlicerTestReadDecorator) + assert not isinstance(wrapped_slicer, GlobalSubstreamCursor) + assert not isinstance(wrapped_slicer.wrapped_slicer, GlobalSubstreamCursor) + assert not isinstance(wrapped_slicer, AsyncJobPartitionRouter) + assert not isinstance(wrapped_slicer.wrapped_slicer, AsyncJobPartitionRouter) + assert not isinstance(wrapped_slicer, PerPartitionWithGlobalCursor) + assert not isinstance(wrapped_slicer.wrapped_slicer, PerPartitionWithGlobalCursor) + + assert isinstance(partition_router, SubstreamPartitionRouter) + assert not isinstance(partition_router, StreamSlicerTestReadDecorator) + assert not isinstance(partition_router, GlobalSubstreamCursor) + assert not isinstance(partition_router, AsyncJobPartitionRouter) + assert not isinstance(partition_router, PerPartitionWithGlobalCursor) + + +def test_isinstance_perpartition_with_global_cursor(): + partition_router = create_substream_partition_router() + date_time_based_cursor = date_time_based_cursor_factory() + + cursor_factory = CursorFactory(date_time_based_cursor_factory) + substream_cursor = PerPartitionWithGlobalCursor( + cursor_factory=cursor_factory, + partition_router=partition_router, + stream_cursor=date_time_based_cursor, + ) + + wrapped_slicer = StreamSlicerTestReadDecorator( + wrapped_slicer=substream_cursor, + maximum_number_of_slices=5, + ) + + assert isinstance(wrapped_slicer, PerPartitionWithGlobalCursor) + assert isinstance(wrapped_slicer.wrapped_slicer, PerPartitionWithGlobalCursor) + assert isinstance(wrapped_slicer, StreamSlicerTestReadDecorator) + + assert not isinstance(wrapped_slicer.wrapped_slicer, StreamSlicerTestReadDecorator) + assert not isinstance(wrapped_slicer, GlobalSubstreamCursor) + assert not isinstance(wrapped_slicer.wrapped_slicer, GlobalSubstreamCursor) + assert not isinstance(wrapped_slicer, AsyncJobPartitionRouter) + assert not isinstance(wrapped_slicer.wrapped_slicer, AsyncJobPartitionRouter) + assert not isinstance(wrapped_slicer, SubstreamPartitionRouter) + assert not isinstance(wrapped_slicer.wrapped_slicer, SubstreamPartitionRouter) + + assert wrapped_slicer._per_partition_cursor._cursor_factory == cursor_factory + assert wrapped_slicer._partition_router == partition_router + assert wrapped_slicer._global_cursor._stream_cursor == date_time_based_cursor + + assert isinstance(substream_cursor, PerPartitionWithGlobalCursor) + assert not isinstance(substream_cursor, StreamSlicerTestReadDecorator) + assert not isinstance(substream_cursor, GlobalSubstreamCursor) + assert not isinstance(substream_cursor, AsyncJobPartitionRouter) + assert not isinstance(substream_cursor, SubstreamPartitionRouter) + + assert substream_cursor._per_partition_cursor._cursor_factory == cursor_factory + assert substream_cursor._partition_router == partition_router + assert substream_cursor._global_cursor._stream_cursor == date_time_based_cursor + + assert substream_cursor._get_active_cursor() == wrapped_slicer._get_active_cursor() + + +def test_slice_limiting_functionality(): + # Create a slicer that returns many slices + mock_slicer = Mock(spec=StreamSlicer) + mock_slicer.stream_slices.return_value = [ + StreamSlice(partition={f"key_{i}": f"value_{i}"}, cursor_slice={}) for i in range(10) + ] + + # Wrap with decorator limiting to 3 slices + wrapped_slicer = StreamSlicerTestReadDecorator( + wrapped_slicer=mock_slicer, + maximum_number_of_slices=3, + ) + + # Verify only 3 slices are returned + slices = list(wrapped_slicer.stream_slices()) + assert len(slices) == 3 + assert slices == mock_slicer.stream_slices.return_value[:3]