Skip to content

Commit 37516af

Browse files
set transform_before_filtering from manifest if client_side_incremental_sync=true
1 parent 3a9d54b commit 37516af

File tree

2 files changed

+72
-8
lines changed

2 files changed

+72
-8
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2827,13 +2827,9 @@ def create_record_selector(
28272827
else None
28282828
)
28292829

2830-
if model.transform_before_filtering is None:
2831-
# default to False if not set
2832-
model.transform_before_filtering = False
2833-
2834-
assert model.transform_before_filtering is not None # for mypy
2835-
2836-
transform_before_filtering = model.transform_before_filtering
2830+
transform_before_filtering = (
2831+
False if model.transform_before_filtering is None else model.transform_before_filtering
2832+
)
28372833
if client_side_incremental_sync:
28382834
record_filter = ClientSideIncrementalRecordFilterDecorator(
28392835
config=config,
@@ -2843,7 +2839,11 @@ def create_record_selector(
28432839
else None,
28442840
**client_side_incremental_sync,
28452841
)
2846-
transform_before_filtering = True
2842+
transform_before_filtering = (
2843+
True
2844+
if model.transform_before_filtering is None
2845+
else model.transform_before_filtering
2846+
)
28472847

28482848
if model.schema_normalization is None:
28492849
# default to no schema normalization if not set

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
#
44

5+
import pytest
56
import copy
67
import json
78
import math
@@ -1876,6 +1877,69 @@ def test_stream_using_is_client_side_incremental_has_cursor_state():
18761877
assert client_side_incremental_cursor_state == expected_cursor_value
18771878

18781879

1880+
@pytest.mark.parametrize(
1881+
"expected_transform_before_filtering",
1882+
[
1883+
pytest.param(
1884+
True,
1885+
id="transform before filtering",
1886+
),
1887+
pytest.param(
1888+
False,
1889+
id="transform after filtering",
1890+
),
1891+
pytest.param(
1892+
None,
1893+
id="default transform before filtering",
1894+
),
1895+
],
1896+
)
1897+
def test_stream_using_is_client_side_incremental_has_transform_before_filtering_according_to_manifest(
1898+
expected_transform_before_filtering,
1899+
):
1900+
expected_cursor_value = "2024-07-01"
1901+
state = [
1902+
AirbyteStateMessage(
1903+
type=AirbyteStateType.STREAM,
1904+
stream=AirbyteStreamState(
1905+
stream_descriptor=StreamDescriptor(name="locations", namespace=None),
1906+
stream_state=AirbyteStateBlob(updated_at=expected_cursor_value),
1907+
),
1908+
)
1909+
]
1910+
1911+
manifest_with_stream_state_interpolation = copy.deepcopy(_MANIFEST)
1912+
1913+
# Enable semi-incremental on the locations stream
1914+
manifest_with_stream_state_interpolation["definitions"]["locations_stream"]["incremental_sync"][
1915+
"is_client_side_incremental"
1916+
] = True
1917+
1918+
if expected_transform_before_filtering is not None:
1919+
manifest_with_stream_state_interpolation["definitions"]["locations_stream"]["retriever"][
1920+
"record_selector"
1921+
]["transform_before_filtering"] = expected_transform_before_filtering
1922+
1923+
source = ConcurrentDeclarativeSource(
1924+
source_config=manifest_with_stream_state_interpolation,
1925+
config=_CONFIG,
1926+
catalog=_CATALOG,
1927+
state=state,
1928+
)
1929+
concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG)
1930+
1931+
locations_stream = concurrent_streams[2]
1932+
assert isinstance(locations_stream, DefaultStream)
1933+
1934+
simple_retriever = locations_stream._stream_partition_generator._partition_factory._retriever
1935+
record_selector = simple_retriever.record_selector
1936+
1937+
if expected_transform_before_filtering is not None:
1938+
assert record_selector.transform_before_filtering == expected_transform_before_filtering
1939+
else:
1940+
assert record_selector.transform_before_filtering is True
1941+
1942+
18791943
def create_wrapped_stream(stream: DeclarativeStream) -> Stream:
18801944
slice_to_records_mapping = get_mocked_read_records_output(stream_name=stream.name)
18811945

0 commit comments

Comments
 (0)