Skip to content

Commit d2262a5

Browse files
maxi297octavia-squidington-iii
andauthored
chore: migrate client side filtering to concurrent cursor (#679)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 2a16fe0 commit d2262a5

File tree

13 files changed

+430
-148
lines changed

13 files changed

+430
-148
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -468,34 +468,11 @@ def _is_concurrent_cursor_incremental_without_partition_routing(
468468
def _get_retriever(
469469
declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
470470
) -> Retriever:
471-
retriever = declarative_stream.retriever
472-
473-
# This is an optimization so that we don't invoke any cursor or state management flows within the
474-
# low-code framework because state management is handled through the ConcurrentCursor.
475-
if declarative_stream and isinstance(retriever, SimpleRetriever):
476-
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
477-
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
478-
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
479-
# like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
480-
# properly initialized with state.
481-
if retriever.cursor:
482-
retriever.cursor.set_initial_state(stream_state=stream_state)
483-
484-
# Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
485-
# from the one initialized on the SimpleRetriever, so it also must also have state initialized
486-
# for semi-incremental streams using is_client_side_incremental to filter properly
487-
if isinstance(retriever.record_selector, RecordSelector) and isinstance(
488-
retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
489-
):
490-
retriever.record_selector.record_filter._cursor.set_initial_state(
491-
stream_state=stream_state
492-
) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds
493-
471+
if declarative_stream and isinstance(declarative_stream.retriever, SimpleRetriever):
494472
# We zero it out here, but since this is a cursor reference, the state is still properly
495473
# instantiated for the other components that reference it
496-
retriever.cursor = None
497-
498-
return retriever
474+
declarative_stream.retriever.cursor = None
475+
return declarative_stream.retriever
499476

500477
@staticmethod
501478
def _select_streams(

airbyte_cdk/sources/declarative/extractors/record_filter.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,8 @@
44
from dataclasses import InitVar, dataclass
55
from typing import Any, Iterable, Mapping, Optional, Union
66

7-
from airbyte_cdk.sources.declarative.incremental import (
8-
DatetimeBasedCursor,
9-
GlobalSubstreamCursor,
10-
PerPartitionWithGlobalCursor,
11-
)
127
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
8+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
139
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
1410

1511

@@ -53,13 +49,13 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter):
5349
"""
5450
Applies a filter to a list of records to exclude those that are older than the stream_state/start_date.
5551
56-
:param DatetimeBasedCursor date_time_based_cursor: Cursor used to extract datetime values
52+
:param Cursor cursor: Cursor used to filter out values
5753
:param PerPartitionCursor per_partition_cursor: Optional Cursor used for mapping cursor value in nested stream_state
5854
"""
5955

6056
def __init__(
6157
self,
62-
cursor: Union[DatetimeBasedCursor, PerPartitionWithGlobalCursor, GlobalSubstreamCursor],
58+
cursor: Union[Cursor],
6359
**kwargs: Any,
6460
):
6561
super().__init__(**kwargs)
@@ -77,7 +73,7 @@ def filter_records(
7773
for record in records
7874
if self._cursor.should_be_synced(
7975
# Record is created on the fly to align with cursors interface; stream name is ignored as we don't need it here
80-
# Record stream name is empty cause it is not used durig the filtering
76+
# Record stream name is empty because it is not used during the filtering
8177
Record(data=record, associated_slice=stream_slice, stream_name="")
8278
)
8379
)

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def __init__(
8181
connector_state_converter: AbstractStreamStateConverter,
8282
cursor_field: CursorField,
8383
use_global_cursor: bool = False,
84+
attempt_to_create_cursor_if_not_provided: bool = False,
8485
) -> None:
8586
self._global_cursor: Optional[StreamState] = {}
8687
self._stream_name = stream_name
@@ -125,6 +126,9 @@ def __init__(
125126

126127
self._set_initial_state(stream_state)
127128

129+
# FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
130+
self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
131+
128132
@property
129133
def cursor_field(self) -> CursorField:
130134
return self._cursor_field
@@ -512,13 +516,28 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
512516
raise ValueError(
513517
"Invalid state as stream slices that are emitted should refer to an existing cursor"
514518
)
519+
520+
if self._use_global_cursor:
521+
return self._create_cursor(
522+
self._global_cursor,
523+
self._lookback_window if self._global_cursor else 0,
524+
)
525+
515526
partition_key = self._to_partition_key(record.associated_slice.partition)
516-
if partition_key not in self._cursor_per_partition:
527+
if (
528+
partition_key not in self._cursor_per_partition
529+
and not self._attempt_to_create_cursor_if_not_provided
530+
):
517531
raise ValueError(
518532
"Invalid state as stream slices that are emitted should refer to an existing cursor"
519533
)
520-
cursor = self._cursor_per_partition[partition_key]
521-
return cursor
534+
elif partition_key not in self._cursor_per_partition:
535+
return self._create_cursor(
536+
self._global_cursor,
537+
self._lookback_window if self._global_cursor else 0,
538+
)
539+
else:
540+
return self._cursor_per_partition[partition_key]
522541

523542
def limit_reached(self) -> bool:
524543
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 80 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
)
3535
from airbyte_cdk.models import FailureType, Level
3636
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
37-
from airbyte_cdk.sources.declarative import transformations
3837
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
3938
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
4039
from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
@@ -604,7 +603,7 @@
604603
WeekClampingStrategy,
605604
Weekday,
606605
)
607-
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField
606+
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, Cursor, CursorField
608607
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
609608
CustomFormatConcurrentStreamStateConverter,
610609
DateTimeStreamStateConverter,
@@ -1475,6 +1474,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
14751474
stream_namespace: Optional[str],
14761475
config: Config,
14771476
message_repository: Optional[MessageRepository] = None,
1477+
stream_state_migrations: Optional[List[Any]] = None,
14781478
**kwargs: Any,
14791479
) -> ConcurrentCursor:
14801480
# Per-partition incremental streams can dynamically create child cursors which will pass their current
@@ -1485,6 +1485,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
14851485
if "stream_state" not in kwargs
14861486
else kwargs["stream_state"]
14871487
)
1488+
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
14881489

14891490
component_type = component_definition.get("type")
14901491
if component_definition.get("type") != model_type.__name__:
@@ -1561,6 +1562,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
15611562
stream_state: MutableMapping[str, Any],
15621563
partition_router: PartitionRouter,
15631564
stream_state_migrations: Optional[List[Any]] = None,
1565+
attempt_to_create_cursor_if_not_provided: bool = False,
15641566
**kwargs: Any,
15651567
) -> ConcurrentPerPartitionCursor:
15661568
component_type = component_definition.get("type")
@@ -1631,6 +1633,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
16311633
connector_state_converter=connector_state_converter,
16321634
cursor_field=cursor_field,
16331635
use_global_cursor=use_global_cursor,
1636+
attempt_to_create_cursor_if_not_provided=attempt_to_create_cursor_if_not_provided,
16341637
)
16351638

16361639
@staticmethod
@@ -1931,30 +1934,17 @@ def create_declarative_stream(
19311934
and hasattr(model.incremental_sync, "is_data_feed")
19321935
and model.incremental_sync.is_data_feed
19331936
)
1934-
client_side_incremental_sync = None
1935-
if (
1937+
client_side_filtering_enabled = (
19361938
model.incremental_sync
19371939
and hasattr(model.incremental_sync, "is_client_side_incremental")
19381940
and model.incremental_sync.is_client_side_incremental
1939-
):
1940-
supported_slicers = (
1941-
DatetimeBasedCursor,
1942-
GlobalSubstreamCursor,
1943-
PerPartitionWithGlobalCursor,
1944-
)
1945-
if combined_slicers and not isinstance(combined_slicers, supported_slicers):
1946-
raise ValueError(
1947-
"Unsupported Slicer is used. PerPartitionWithGlobalCursor should be used here instead"
1948-
)
1949-
cursor = (
1950-
combined_slicers
1951-
if isinstance(
1952-
combined_slicers, (PerPartitionWithGlobalCursor, GlobalSubstreamCursor)
1953-
)
1954-
else self._create_component_from_model(model=model.incremental_sync, config=config)
1941+
)
1942+
concurrent_cursor = None
1943+
if stop_condition_on_cursor or client_side_filtering_enabled:
1944+
stream_slicer = self._build_stream_slicer_from_partition_router(
1945+
model.retriever, config, stream_name=model.name
19551946
)
1956-
1957-
client_side_incremental_sync = {"cursor": cursor}
1947+
concurrent_cursor = self._build_concurrent_cursor(model, stream_slicer, config)
19581948

19591949
if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
19601950
cursor_model = model.incremental_sync
@@ -2029,8 +2019,10 @@ def create_declarative_stream(
20292019
primary_key=primary_key,
20302020
stream_slicer=combined_slicers,
20312021
request_options_provider=request_options_provider,
2032-
stop_condition_on_cursor=stop_condition_on_cursor,
2033-
client_side_incremental_sync=client_side_incremental_sync,
2022+
stop_condition_cursor=concurrent_cursor,
2023+
client_side_incremental_sync={"cursor": concurrent_cursor}
2024+
if client_side_filtering_enabled
2025+
else None,
20342026
transformations=transformations,
20352027
file_uploader=file_uploader,
20362028
incremental_sync=model.incremental_sync,
@@ -2185,6 +2177,67 @@ def _build_incremental_cursor(
21852177
return self._create_component_from_model(model=model.incremental_sync, config=config) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync
21862178
return None
21872179

2180+
def _build_concurrent_cursor(
2181+
self,
2182+
model: DeclarativeStreamModel,
2183+
stream_slicer: Optional[PartitionRouter],
2184+
config: Config,
2185+
) -> Optional[StreamSlicer]:
2186+
stream_state = self._connector_state_manager.get_stream_state(
2187+
stream_name=model.name or "", namespace=None
2188+
)
2189+
2190+
if model.incremental_sync and stream_slicer:
2191+
# FIXME there is a discrepancy where this logic is applied on the create_*_cursor methods for
2192+
# ConcurrentCursor but it is applied outside of create_concurrent_cursor_from_perpartition_cursor
2193+
if model.state_migrations:
2194+
state_transformations = [
2195+
self._create_component_from_model(
2196+
state_migration, config, declarative_stream=model
2197+
)
2198+
for state_migration in model.state_migrations
2199+
]
2200+
else:
2201+
state_transformations = []
2202+
2203+
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
2204+
state_manager=self._connector_state_manager,
2205+
model_type=DatetimeBasedCursorModel,
2206+
component_definition=model.incremental_sync.__dict__,
2207+
stream_name=model.name or "",
2208+
stream_namespace=None,
2209+
config=config or {},
2210+
stream_state=stream_state,
2211+
stream_state_migrations=state_transformations,
2212+
partition_router=stream_slicer,
2213+
attempt_to_create_cursor_if_not_provided=True,
2214+
)
2215+
elif model.incremental_sync:
2216+
if type(model.incremental_sync) == IncrementingCountCursorModel:
2217+
return self.create_concurrent_cursor_from_incrementing_count_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
2218+
model_type=IncrementingCountCursorModel,
2219+
component_definition=model.incremental_sync.__dict__,
2220+
stream_name=model.name or "",
2221+
stream_namespace=None,
2222+
config=config or {},
2223+
stream_state_migrations=model.state_migrations,
2224+
)
2225+
elif type(model.incremental_sync) == DatetimeBasedCursorModel:
2226+
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
2227+
model_type=type(model.incremental_sync),
2228+
component_definition=model.incremental_sync.__dict__,
2229+
stream_name=model.name or "",
2230+
stream_namespace=None,
2231+
config=config or {},
2232+
stream_state_migrations=model.state_migrations,
2233+
attempt_to_create_cursor_if_not_provided=True,
2234+
)
2235+
else:
2236+
raise ValueError(
2237+
f"Incremental sync of type {type(model.incremental_sync)} is not supported"
2238+
)
2239+
return None
2240+
21882241
def _build_resumable_cursor(
21892242
self,
21902243
model: Union[
@@ -2285,7 +2338,7 @@ def create_default_paginator(
22852338
url_base: str,
22862339
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
22872340
decoder: Optional[Decoder] = None,
2288-
cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None,
2341+
cursor_used_for_stop_condition: Optional[Cursor] = None,
22892342
) -> Union[DefaultPaginator, PaginatorTestReadDecorator]:
22902343
if decoder:
22912344
if self._is_supported_decoder_for_pagination(decoder):
@@ -3146,7 +3199,7 @@ def create_simple_retriever(
31463199
primary_key: Optional[Union[str, List[str], List[List[str]]]],
31473200
stream_slicer: Optional[StreamSlicer],
31483201
request_options_provider: Optional[RequestOptionsProvider] = None,
3149-
stop_condition_on_cursor: bool = False,
3202+
stop_condition_cursor: Optional[Cursor] = None,
31503203
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
31513204
transformations: List[RecordTransformation],
31523205
file_uploader: Optional[DefaultFileUploader] = None,
@@ -3277,15 +3330,14 @@ def _get_url() -> str:
32773330
),
32783331
)
32793332

3280-
cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
32813333
paginator = (
32823334
self._create_component_from_model(
32833335
model=model.paginator,
32843336
config=config,
32853337
url_base=_get_url(),
32863338
extractor_model=model.record_selector.extractor,
32873339
decoder=decoder,
3288-
cursor_used_for_stop_condition=cursor_used_for_stop_condition,
3340+
cursor_used_for_stop_condition=stop_condition_cursor or None,
32893341
)
32903342
if model.paginator
32913343
else NoPagination(parameters={})

airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@
77

88
import requests
99

10-
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
1110
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
1211
PaginationStrategy,
1312
)
14-
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
13+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
1514
from airbyte_cdk.sources.types import Record
1615

1716

@@ -29,8 +28,7 @@ def is_met(self, record: Record) -> bool:
2928
class CursorStopCondition(PaginationStopCondition):
3029
def __init__(
3130
self,
32-
cursor: DeclarativeCursor
33-
| ConcurrentCursor, # migrate to use both old and concurrent versions
31+
cursor: Cursor,
3432
):
3533
self._cursor = cursor
3634

airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,3 +311,6 @@ def set_initial_state(self, value: StreamState) -> None:
311311

312312
def ensure_at_least_one_state_emitted(self) -> None:
313313
self.emit_state_message()
314+
315+
def should_be_synced(self, record: Record) -> bool:
316+
return True

airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,6 @@ def ensure_at_least_one_state_emitted(self) -> None:
8181
self._stream_name, self._stream_namespace
8282
)
8383
self._message_repository.emit_message(state_message)
84+
85+
def should_be_synced(self, record: Record) -> bool:
86+
return True

0 commit comments

Comments
 (0)