-
Notifications
You must be signed in to change notification settings - Fork 31
Closed
Description
Issue
Because filtering is applied before transformations, the cursor field may not be present at the root level when used by ClientSideIncrementalRecordFilterDecorator. This leads to issues when filtering records incrementally.
Solution
To fix this:
- Check if any transformation adds the cursor field.
- If it does, execute the transformation before filtering to ensure the cursor field is available for filtering.
def filter_and_transform(
self,
all_data: Iterable[Mapping[str, Any]],
stream_state: StreamState,
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Iterable[Record]:
"""
There is an issue with the selector as of 2024-08-30: it does technology-agnostic processing like filtering, transformation and
normalization with an API that is technology-specific (as requests.Response is only for HTTP communication using the requests
library).
Until we decide to move this logic away from the selector, we made this method public so that users like AsyncJobRetriever could
share the logic of doing transformations on a set of records.
"""
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
for data in normalized_data:
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
Metadata
Metadata
Assignees
Labels
No labels