Skip to content

Commit 522caab

Browse files
authored
fix(low code): Fix missing cursor for ClientSideIncrementalRecordFilterDecorator (#334)
1 parent fdcded3 commit 522caab

File tree

4 files changed

+109
-4
lines changed

4 files changed

+109
-4
lines changed

airbyte_cdk/sources/declarative/extractors/record_selector.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class RecordSelector(HttpSelector):
4141
_name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
4242
record_filter: Optional[RecordFilter] = None
4343
transformations: List[RecordTransformation] = field(default_factory=lambda: [])
44+
transform_before_filtering: bool = False
4445

4546
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4647
self._parameters = parameters
@@ -104,9 +105,17 @@ def filter_and_transform(
104105
Until we decide to move this logic away from the selector, we made this method public so that users like AsyncJobRetriever could
105106
share the logic of doing transformations on a set of records.
106107
"""
107-
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
108-
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
109-
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
108+
if self.transform_before_filtering:
109+
transformed_data = self._transform(all_data, stream_state, stream_slice)
110+
transformed_filtered_data = self._filter(
111+
transformed_data, stream_state, stream_slice, next_page_token
112+
)
113+
else:
114+
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
115+
transformed_filtered_data = self._transform(filtered_data, stream_state, stream_slice)
116+
normalized_data = self._normalize_by_schema(
117+
transformed_filtered_data, schema=records_schema
118+
)
110119
for data in normalized_data:
111120
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
112121

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2415,6 +2415,8 @@ def create_record_selector(
24152415
if model.record_filter
24162416
else None
24172417
)
2418+
2419+
transform_before_filtering = False
24182420
if client_side_incremental_sync:
24192421
record_filter = ClientSideIncrementalRecordFilterDecorator(
24202422
config=config,
@@ -2424,6 +2426,8 @@ def create_record_selector(
24242426
else None,
24252427
**client_side_incremental_sync,
24262428
)
2429+
transform_before_filtering = True
2430+
24272431
schema_normalization = (
24282432
TypeTransformer(SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization])
24292433
if isinstance(model.schema_normalization, SchemaNormalizationModel)
@@ -2438,6 +2442,7 @@ def create_record_selector(
24382442
transformations=transformations or [],
24392443
schema_normalization=schema_normalization,
24402444
parameters=model.parameters or {},
2445+
transform_before_filtering=transform_before_filtering,
24412446
)
24422447

24432448
@staticmethod

unit_tests/sources/declarative/extractors/test_record_selector.py

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
import json
6-
from unittest.mock import Mock, call
6+
from unittest.mock import MagicMock, Mock, call
77

88
import pytest
99
import requests
@@ -220,3 +220,91 @@ def create_schema():
220220
"field_float": {"type": "number"},
221221
},
222222
}
223+
224+
225+
@pytest.mark.parametrize("transform_before_filtering", [True, False])
226+
def test_transform_before_filtering(transform_before_filtering):
227+
"""
228+
Verify that when transform_before_filtering=True, records are modified before
229+
filtering. When False, the filter sees the original record data first.
230+
"""
231+
232+
# 1) Our response body with 'myfield' set differently
233+
# The first record has myfield=0 (needs transformation to pass)
234+
# The second record has myfield=999 (already passes the filter)
235+
body = {"data": [{"id": 1, "myfield": 0}, {"id": 2, "myfield": 999}]}
236+
237+
# 2) A response object
238+
response = requests.Response()
239+
response._content = json.dumps(body).encode("utf-8")
240+
241+
# 3) A simple extractor pulling records from 'data'
242+
extractor = DpathExtractor(
243+
field_path=["data"], decoder=JsonDecoder(parameters={}), config={}, parameters={}
244+
)
245+
246+
# 4) A filter that keeps only records whose 'myfield' == 999
247+
# i.e.: "{{ record['myfield'] == 999 }}"
248+
record_filter = RecordFilter(
249+
config={},
250+
condition="{{ record['myfield'] == 999 }}",
251+
parameters={},
252+
)
253+
254+
# 5) A transformation that sets 'myfield' to 999
255+
# We'll attach it to a mock so we can confirm how many times it was called
256+
transformation_mock = MagicMock(spec=RecordTransformation)
257+
258+
def transformation_side_effect(record, config, stream_state, stream_slice):
259+
record["myfield"] = 999
260+
261+
transformation_mock.transform.side_effect = transformation_side_effect
262+
263+
# 6) Create a RecordSelector with transform_before_filtering set from our param
264+
record_selector = RecordSelector(
265+
extractor=extractor,
266+
config={},
267+
name="test_stream",
268+
record_filter=record_filter,
269+
transformations=[transformation_mock],
270+
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
271+
transform_before_filtering=transform_before_filtering,
272+
parameters={},
273+
)
274+
275+
# 7) Collect records
276+
stream_slice = StreamSlice(partition={}, cursor_slice={})
277+
actual_records = list(
278+
record_selector.select_records(
279+
response=response,
280+
records_schema={}, # not using schema in this test
281+
stream_state={},
282+
stream_slice=stream_slice,
283+
next_page_token=None,
284+
)
285+
)
286+
287+
# 8) Assert how many records survive
288+
if transform_before_filtering:
289+
# Both records become myfield=999 BEFORE the filter => both pass
290+
assert len(actual_records) == 2
291+
# The transformation should be called 2x (once per record)
292+
assert transformation_mock.transform.call_count == 2
293+
else:
294+
# The first record is myfield=0 when the filter sees it => filter excludes it
295+
# The second record is myfield=999 => filter includes it
296+
assert len(actual_records) == 1
297+
# The transformation occurs only on that single surviving record
298+
# (the filter is done first, so the first record is already dropped)
299+
assert transformation_mock.transform.call_count == 1
300+
301+
# 9) Check final record data
302+
# If transform_before_filtering=True => we have records [1,2] both with myfield=999
303+
# If transform_before_filtering=False => we have record [2] with myfield=999
304+
final_record_data = [r.data for r in actual_records]
305+
if transform_before_filtering:
306+
assert all(record["myfield"] == 999 for record in final_record_data)
307+
assert sorted([r["id"] for r in final_record_data]) == [1, 2]
308+
else:
309+
assert final_record_data[0]["id"] == 2
310+
assert final_record_data[0]["myfield"] == 999

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1194,6 +1194,8 @@ def test_client_side_incremental():
11941194
stream.retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
11951195
)
11961196

1197+
assert stream.retriever.record_selector.transform_before_filtering == True
1198+
11971199

11981200
def test_client_side_incremental_with_partition_router():
11991201
content = """
@@ -1274,6 +1276,7 @@ def test_client_side_incremental_with_partition_router():
12741276
assert isinstance(
12751277
stream.retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
12761278
)
1279+
assert stream.retriever.record_selector.transform_before_filtering == True
12771280
assert isinstance(
12781281
stream.retriever.record_selector.record_filter._cursor,
12791282
PerPartitionWithGlobalCursor,

0 commit comments

Comments
 (0)