Skip to content

Commit 5c0c12b

Browse files
committed
[ISSUE #10550] have streams without partition routers nor cursor run concurrently
1 parent e27cb81 commit 5c0c12b

File tree

4 files changed

+69
-80
lines changed

4 files changed

+69
-80
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
4949
AlwaysAvailableAvailabilityStrategy,
5050
)
51+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor
5152
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
5253
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
5354

@@ -189,26 +190,18 @@ def _group_streams(
189190
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
190191
# so we need to treat them as synchronous
191192
if isinstance(declarative_stream, DeclarativeStream):
192-
datetime_based_cursor_component_definition = name_to_stream_mapping[
193-
declarative_stream.name
194-
].get("incremental_sync")
193+
datetime_based_cursor_component_definition = name_to_stream_mapping[declarative_stream.name].get("incremental_sync")
195194

196-
if (
197-
datetime_based_cursor_component_definition
198-
and datetime_based_cursor_component_definition.get("type", "")
199-
== DatetimeBasedCursorModel.__name__
200-
and self._stream_supports_concurrent_partition_processing(
201-
declarative_stream=declarative_stream
202-
)
203-
and hasattr(declarative_stream.retriever, "stream_slicer")
204-
and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
205-
):
195+
is_without_partition_router_nor_cursor = not bool(datetime_based_cursor_component_definition) and not(name_to_stream_mapping[declarative_stream.name].get("retriever", {}).get("partition_router"))
196+
is_datetime_incremental_with_partition_routing = self._is_datetime_incremental_with_partition_routing(datetime_based_cursor_component_definition,
197+
declarative_stream)
198+
if is_without_partition_router_nor_cursor or is_datetime_incremental_with_partition_routing:
206199
stream_state = state_manager.get_stream_state(
207200
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
208201
)
209202

210-
cursor, connector_state_converter = (
211-
self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
203+
if is_datetime_incremental_with_partition_routing:
204+
cursor: Cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
212205
state_manager=state_manager,
213206
model_type=DatetimeBasedCursorModel,
214207
component_definition=datetime_based_cursor_component_definition,
@@ -217,7 +210,8 @@ def _group_streams(
217210
config=config or {},
218211
stream_state=stream_state,
219212
)
220-
)
213+
else:
214+
cursor = FinalStateCursor(declarative_stream.name, declarative_stream.namespace, self.message_repository)
221215

222216
partition_generator = StreamSlicerPartitionGenerator(
223217
DeclarativePartitionFactory(
@@ -240,7 +234,7 @@ def _group_streams(
240234
json_schema=declarative_stream.get_json_schema(),
241235
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
242236
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
243-
cursor_field=cursor.cursor_field.cursor_field_key,
237+
cursor_field=cursor.cursor_field.cursor_field_key if hasattr(cursor, "cursor_field") else None,
244238
logger=self.logger,
245239
cursor=cursor,
246240
)
@@ -252,6 +246,9 @@ def _group_streams(
252246

253247
return concurrent_streams, synchronous_streams
254248

249+
def _is_datetime_incremental_with_partition_routing(self, datetime_based_cursor_component_definition: Mapping[str, Any], declarative_stream: DeclarativeStream) -> bool:
250+
return bool(datetime_based_cursor_component_definition) and datetime_based_cursor_component_definition.get("type", "") == DatetimeBasedCursorModel.__name__ and self._stream_supports_concurrent_partition_processing(declarative_stream=declarative_stream) and hasattr(declarative_stream.retriever, "stream_slicer") and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
251+
255252
def _stream_supports_concurrent_partition_processing(
256253
self, declarative_stream: DeclarativeStream
257254
) -> bool:

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,7 @@ def create_concurrent_cursor_from_datetime_based_cursor(
753753
config: Config,
754754
stream_state: MutableMapping[str, Any],
755755
**kwargs: Any,
756-
) -> Tuple[ConcurrentCursor, DateTimeStreamStateConverter]:
756+
) -> ConcurrentCursor:
757757
component_type = component_definition.get("type")
758758
if component_definition.get("type") != model_type.__name__:
759759
raise ValueError(
@@ -884,23 +884,20 @@ def create_concurrent_cursor_from_datetime_based_cursor(
884884
if evaluated_step:
885885
step_length = parse_duration(evaluated_step)
886886

887-
return (
888-
ConcurrentCursor(
889-
stream_name=stream_name,
890-
stream_namespace=stream_namespace,
891-
stream_state=stream_state,
892-
message_repository=self._message_repository, # type: ignore # message_repository is always instantiated with a value by factory
893-
connector_state_manager=state_manager,
894-
connector_state_converter=connector_state_converter,
895-
cursor_field=cursor_field,
896-
slice_boundary_fields=slice_boundary_fields,
897-
start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
898-
end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
899-
lookback_window=lookback_window,
900-
slice_range=step_length,
901-
cursor_granularity=cursor_granularity,
902-
),
903-
connector_state_converter,
887+
return ConcurrentCursor(
888+
stream_name=stream_name,
889+
stream_namespace=stream_namespace,
890+
stream_state=stream_state,
891+
message_repository=self._message_repository, # type: ignore # message_repository is always instantiated with a value by factory
892+
connector_state_manager=state_manager,
893+
connector_state_converter=connector_state_converter,
894+
cursor_field=cursor_field,
895+
slice_boundary_fields=slice_boundary_fields,
896+
start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
897+
end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
898+
lookback_window=lookback_window,
899+
slice_range=step_length,
900+
cursor_granularity=cursor_granularity,
904901
)
905902

906903
@staticmethod

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3055,16 +3055,14 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
30553055
"lookback_window": "P3D",
30563056
}
30573057

3058-
concurrent_cursor, stream_state_converter = (
3059-
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
3060-
state_manager=connector_state_manager,
3061-
model_type=DatetimeBasedCursorModel,
3062-
component_definition=cursor_component_definition,
3063-
stream_name=stream_name,
3064-
stream_namespace=None,
3065-
config=config,
3066-
stream_state=stream_state,
3067-
)
3058+
concurrent_cursor = connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
3059+
state_manager=connector_state_manager,
3060+
model_type=DatetimeBasedCursorModel,
3061+
component_definition=cursor_component_definition,
3062+
stream_name=stream_name,
3063+
stream_namespace=None,
3064+
config=config,
3065+
stream_state=stream_state,
30683066
)
30693067

30703068
assert concurrent_cursor._stream_name == stream_name
@@ -3087,6 +3085,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
30873085
assert concurrent_cursor._end_provider() == expected_end
30883086
assert concurrent_cursor._concurrent_state == expected_concurrent_state
30893087

3088+
stream_state_converter = concurrent_cursor._connector_state_converter
30903089
assert isinstance(stream_state_converter, CustomFormatConcurrentStreamStateConverter)
30913090
assert stream_state_converter._datetime_format == expected_datetime_format
30923091
assert stream_state_converter._is_sequential_state
@@ -3187,16 +3186,14 @@ def test_create_concurrent_cursor_from_datetime_based_cursor(
31873186
stream_state={},
31883187
)
31893188
else:
3190-
concurrent_cursor, stream_state_converter = (
3191-
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
3192-
state_manager=connector_state_manager,
3193-
model_type=DatetimeBasedCursorModel,
3194-
component_definition=cursor_component_definition,
3195-
stream_name=stream_name,
3196-
stream_namespace=None,
3197-
config=config,
3198-
stream_state={},
3199-
)
3189+
concurrent_cursor = connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
3190+
state_manager=connector_state_manager,
3191+
model_type=DatetimeBasedCursorModel,
3192+
component_definition=cursor_component_definition,
3193+
stream_name=stream_name,
3194+
stream_namespace=None,
3195+
config=config,
3196+
stream_state={},
32003197
)
32013198

32023199
assert getattr(concurrent_cursor, assertion_field) == expected_value
@@ -3244,16 +3241,14 @@ def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined():
32443241
"lookback_window": "P3D",
32453242
}
32463243

3247-
concurrent_cursor, stream_state_converter = (
3248-
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
3249-
state_manager=connector_state_manager,
3250-
model_type=DatetimeBasedCursorModel,
3251-
component_definition=cursor_component_definition,
3252-
stream_name=stream_name,
3253-
stream_namespace=None,
3254-
config=config,
3255-
stream_state={},
3256-
)
3244+
concurrent_cursor = connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
3245+
state_manager=connector_state_manager,
3246+
model_type=DatetimeBasedCursorModel,
3247+
component_definition=cursor_component_definition,
3248+
stream_name=stream_name,
3249+
stream_namespace=None,
3250+
config=config,
3251+
stream_state={},
32573252
)
32583253

32593254
assert concurrent_cursor.start == expected_start

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -479,20 +479,19 @@ def test_group_streams():
479479
synchronous_streams = source._synchronous_streams
480480

481481
# 2 incremental streams
482-
assert len(concurrent_streams) == 2
483-
concurrent_stream_0, concurrent_stream_1 = concurrent_streams
482+
assert len(concurrent_streams) == 3
483+
concurrent_stream_0, concurrent_stream_1, concurrent_stream_2 = concurrent_streams
484484
assert isinstance(concurrent_stream_0, DefaultStream)
485485
assert concurrent_stream_0.name == "party_members"
486486
assert isinstance(concurrent_stream_1, DefaultStream)
487-
assert concurrent_stream_1.name == "locations"
487+
assert concurrent_stream_1.name == "palaces"
488+
assert isinstance(concurrent_stream_2, DefaultStream)
489+
assert concurrent_stream_2.name == "locations"
488490

489491
# 1 full refresh stream, 1 substream
490-
assert len(synchronous_streams) == 2
491-
synchronous_stream_0, synchronous_stream_1 = synchronous_streams
492-
assert isinstance(synchronous_stream_0, DeclarativeStream)
493-
assert synchronous_stream_0.name == "palaces"
494-
assert isinstance(synchronous_stream_1, DeclarativeStream)
495-
assert synchronous_stream_1.name == "party_members_skills"
492+
assert len(synchronous_streams) == 1
493+
assert isinstance(synchronous_streams[0], DeclarativeStream)
494+
assert synchronous_streams[0].name == "party_members_skills"
496495

497496

498497
@freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc))
@@ -539,7 +538,7 @@ def test_create_concurrent_cursor():
539538
assert party_members_cursor._lookback_window == timedelta(days=5)
540539
assert party_members_cursor._cursor_granularity == timedelta(days=1)
541540

542-
locations_stream = source._concurrent_streams[1]
541+
locations_stream = source._concurrent_streams[2]
543542
assert isinstance(locations_stream, DefaultStream)
544543
locations_cursor = locations_stream.cursor
545544

@@ -754,7 +753,7 @@ def test_read_with_concurrent_and_synchronous_streams():
754753
assert len(palaces_states) == 1
755754
assert (
756755
palaces_states[0].stream.stream_state.__dict__
757-
== AirbyteStateBlob(__ab_full_refresh_sync_complete=True).__dict__
756+
== AirbyteStateBlob(__ab_no_cursor_state_message=True).__dict__
758757
)
759758

760759
# Expects 3 records, 3 slices, 3 records in slice
@@ -1275,8 +1274,8 @@ def test_streams_with_stream_state_interpolation_should_be_synchronous():
12751274
state=None,
12761275
)
12771276

1278-
assert len(source._concurrent_streams) == 0
1279-
assert len(source._synchronous_streams) == 4
1277+
assert len(source._concurrent_streams) == 1
1278+
assert len(source._synchronous_streams) == 3
12801279

12811280

12821281
def test_given_partition_routing_and_incremental_sync_then_stream_is_not_concurrent():
@@ -1571,5 +1570,6 @@ def get_states_for_stream(
15711570

15721571

15731572
def disable_emitting_sequential_state_messages(source: ConcurrentDeclarativeSource) -> None:
1574-
for concurrent_streams in source._concurrent_streams: # type: ignore # This is the easiest way to disable behavior from the test
1575-
concurrent_streams.cursor._connector_state_converter._is_sequential_state = False # type: ignore # see above
1573+
for concurrent_stream in source._concurrent_streams: # type: ignore # This is the easiest way to disable behavior from the test
1574+
if isinstance(concurrent_stream.cursor, ConcurrentCursor):
1575+
concurrent_stream.cursor._connector_state_converter._is_sequential_state = False # type: ignore # see above

0 commit comments

Comments
 (0)