Skip to content

Commit f616450

Browse files
committed
Properly call set_initial_state() on the cursor that is initialized on the ClientSideIncrementalRecordFilterDecorator instance
1 parent ef97304 commit f616450

File tree

2 files changed

+54
-2
lines changed

2 files changed

+54
-2
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -475,10 +475,21 @@ def _get_retriever(
475475
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
476476
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
477477
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
478-
# like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator
479-
# still rely on a DatetimeBasedCursor that is properly initialized with state.
478+
# like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
479+
# properly initialized with state.
480480
if retriever.cursor:
481481
retriever.cursor.set_initial_state(stream_state=stream_state)
482+
483+
# Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
484+
# from the one initialized on the SimpleRetriever, so it also must also have state initialized
485+
# for semi-incremental streams using is_client_side_incremental to filter properly
486+
if isinstance(retriever.record_selector, RecordSelector) and isinstance(
487+
retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
488+
):
489+
retriever.record_selector.record_filter._cursor.set_initial_state(
490+
stream_state=stream_state
491+
) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds
492+
482493
# We zero it out here, but since this is a cursor reference, the state is still properly
483494
# instantiated for the other components that reference it
484495
retriever.cursor = None

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
ConcurrentDeclarativeSource,
3333
)
3434
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
35+
from airbyte_cdk.sources.declarative.extractors.record_filter import (
36+
ClientSideIncrementalRecordFilterDecorator,
37+
)
3538
from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
3639
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
3740
StreamSlicerPartitionGenerator,
@@ -1647,6 +1650,44 @@ def test_async_incremental_stream_uses_concurrent_cursor_with_state():
16471650
assert async_job_partition_router.stream_slicer._concurrent_state == expected_state
16481651

16491652

1653+
def test_stream_using_is_client_side_incremental_has_cursor_state():
1654+
expected_cursor_value = "2024-07-01"
1655+
state = [
1656+
AirbyteStateMessage(
1657+
type=AirbyteStateType.STREAM,
1658+
stream=AirbyteStreamState(
1659+
stream_descriptor=StreamDescriptor(name="locations", namespace=None),
1660+
stream_state=AirbyteStateBlob(updated_at=expected_cursor_value),
1661+
),
1662+
)
1663+
]
1664+
1665+
manifest_with_stream_state_interpolation = copy.deepcopy(_MANIFEST)
1666+
1667+
# Enable semi-incremental on the locations stream
1668+
manifest_with_stream_state_interpolation["definitions"]["locations_stream"]["incremental_sync"][
1669+
"is_client_side_incremental"
1670+
] = True
1671+
1672+
source = ConcurrentDeclarativeSource(
1673+
source_config=manifest_with_stream_state_interpolation,
1674+
config=_CONFIG,
1675+
catalog=_CATALOG,
1676+
state=state,
1677+
)
1678+
concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG)
1679+
1680+
locations_stream = concurrent_streams[2]
1681+
assert isinstance(locations_stream, DefaultStream)
1682+
1683+
simple_retriever = locations_stream._stream_partition_generator._partition_factory._retriever
1684+
record_filter = simple_retriever.record_selector.record_filter
1685+
assert isinstance(record_filter, ClientSideIncrementalRecordFilterDecorator)
1686+
client_side_incremental_cursor_state = record_filter._cursor._cursor
1687+
1688+
assert client_side_incremental_cursor_state == expected_cursor_value
1689+
1690+
16501691
def create_wrapped_stream(stream: DeclarativeStream) -> Stream:
16511692
slice_to_records_mapping = get_mocked_read_records_output(stream_name=stream.name)
16521693

0 commit comments

Comments
 (0)