diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4d4cd9440..7feca1980 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2827,13 +2827,9 @@ def create_record_selector( else None ) - if model.transform_before_filtering is None: - # default to False if not set - model.transform_before_filtering = False - - assert model.transform_before_filtering is not None # for mypy - - transform_before_filtering = model.transform_before_filtering + transform_before_filtering = ( + False if model.transform_before_filtering is None else model.transform_before_filtering + ) if client_side_incremental_sync: record_filter = ClientSideIncrementalRecordFilterDecorator( config=config, @@ -2843,7 +2839,11 @@ def create_record_selector( else None, **client_side_incremental_sync, ) - transform_before_filtering = True + transform_before_filtering = ( + True + if model.transform_before_filtering is None + else model.transform_before_filtering + ) if model.schema_normalization is None: # default to no schema normalization if not set diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 6af69eac5..148633014 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -11,6 +11,7 @@ import freezegun import isodate +import pytest from typing_extensions import deprecated from airbyte_cdk.models import ( @@ -1876,6 +1877,69 @@ def test_stream_using_is_client_side_incremental_has_cursor_state(): assert client_side_incremental_cursor_state == expected_cursor_value +@pytest.mark.parametrize( + "expected_transform_before_filtering", + [ + pytest.param( + True, + id="transform before filtering", + ), + pytest.param( + False, + id="transform after filtering", + ), + pytest.param( + None, + id="default transform before filtering", + ), + ], +) +def test_stream_using_is_client_side_incremental_has_transform_before_filtering_according_to_manifest( + expected_transform_before_filtering, +): + expected_cursor_value = "2024-07-01" + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="locations", namespace=None), + stream_state=AirbyteStateBlob(updated_at=expected_cursor_value), + ), + ) + ] + + manifest_with_stream_state_interpolation = copy.deepcopy(_MANIFEST) + + # Enable semi-incremental on the locations stream + manifest_with_stream_state_interpolation["definitions"]["locations_stream"]["incremental_sync"][ + "is_client_side_incremental" + ] = True + + if expected_transform_before_filtering is not None: + manifest_with_stream_state_interpolation["definitions"]["locations_stream"]["retriever"][ + "record_selector" + ]["transform_before_filtering"] = expected_transform_before_filtering + + source = ConcurrentDeclarativeSource( + source_config=manifest_with_stream_state_interpolation, + config=_CONFIG, + catalog=_CATALOG, + state=state, + ) + concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) + + locations_stream = concurrent_streams[2] + assert isinstance(locations_stream, DefaultStream) + + simple_retriever = locations_stream._stream_partition_generator._partition_factory._retriever + record_selector = simple_retriever.record_selector + + if expected_transform_before_filtering is not None: + assert record_selector.transform_before_filtering == expected_transform_before_filtering + else: + assert record_selector.transform_before_filtering is True + + def create_wrapped_stream(stream: DeclarativeStream) -> Stream: slice_to_records_mapping = get_mocked_read_records_output(stream_name=stream.name)