@@ -190,28 +190,48 @@ def _group_streams(
190190 # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
191191 # so we need to treat them as synchronous
192192 if isinstance (declarative_stream , DeclarativeStream ):
193- datetime_based_cursor_component_definition = name_to_stream_mapping [declarative_stream .name ].get ("incremental_sync" )
194-
195- is_without_partition_router_nor_cursor = not bool (datetime_based_cursor_component_definition ) and not (name_to_stream_mapping [declarative_stream .name ].get ("retriever" , {}).get ("partition_router" ))
196- is_datetime_incremental_with_partition_routing = self ._is_datetime_incremental_with_partition_routing (datetime_based_cursor_component_definition ,
197- declarative_stream )
198- if is_without_partition_router_nor_cursor or is_datetime_incremental_with_partition_routing :
193+ datetime_based_cursor_component_definition = name_to_stream_mapping [
194+ declarative_stream .name
195+ ].get ("incremental_sync" )
196+
197+ is_without_partition_router_nor_cursor = not bool (
198+ datetime_based_cursor_component_definition
199+ ) and not (
200+ name_to_stream_mapping [declarative_stream .name ]
201+ .get ("retriever" , {})
202+ .get ("partition_router" )
203+ )
204+ is_datetime_incremental_with_partition_routing = (
205+ self ._is_datetime_incremental_with_partition_routing (
206+ datetime_based_cursor_component_definition , declarative_stream
207+ )
208+ )
209+ if (
210+ is_without_partition_router_nor_cursor
211+ or is_datetime_incremental_with_partition_routing
212+ ):
199213 stream_state = state_manager .get_stream_state (
200214 stream_name = declarative_stream .name , namespace = declarative_stream .namespace
201215 )
202216
203217 if is_datetime_incremental_with_partition_routing :
204- cursor : Cursor = self ._constructor .create_concurrent_cursor_from_datetime_based_cursor (
205- state_manager = state_manager ,
206- model_type = DatetimeBasedCursorModel ,
207- component_definition = datetime_based_cursor_component_definition ,
208- stream_name = declarative_stream .name ,
209- stream_namespace = declarative_stream .namespace ,
210- config = config or {},
211- stream_state = stream_state ,
218+ cursor : Cursor = (
219+ self ._constructor .create_concurrent_cursor_from_datetime_based_cursor (
220+ state_manager = state_manager ,
221+ model_type = DatetimeBasedCursorModel ,
222+ component_definition = datetime_based_cursor_component_definition ,
223+ stream_name = declarative_stream .name ,
224+ stream_namespace = declarative_stream .namespace ,
225+ config = config or {},
226+ stream_state = stream_state ,
227+ )
212228 )
213229 else :
214- cursor = FinalStateCursor (declarative_stream .name , declarative_stream .namespace , self .message_repository )
230+ cursor = FinalStateCursor (
231+ declarative_stream .name ,
232+ declarative_stream .namespace ,
233+ self .message_repository ,
234+ )
215235
216236 partition_generator = StreamSlicerPartitionGenerator (
217237 DeclarativePartitionFactory (
@@ -234,7 +254,9 @@ def _group_streams(
234254 json_schema = declarative_stream .get_json_schema (),
235255 availability_strategy = AlwaysAvailableAvailabilityStrategy (),
236256 primary_key = get_primary_key_from_stream (declarative_stream .primary_key ),
237- cursor_field = cursor .cursor_field .cursor_field_key if hasattr (cursor , "cursor_field" ) else None ,
257+ cursor_field = cursor .cursor_field .cursor_field_key
258+ if hasattr (cursor , "cursor_field" )
259+ else None ,
238260 logger = self .logger ,
239261 cursor = cursor ,
240262 )
@@ -246,8 +268,21 @@ def _group_streams(
246268
247269 return concurrent_streams , synchronous_streams
248270
249- def _is_datetime_incremental_with_partition_routing (self , datetime_based_cursor_component_definition : Mapping [str , Any ], declarative_stream : DeclarativeStream ) -> bool :
250- return bool (datetime_based_cursor_component_definition ) and datetime_based_cursor_component_definition .get ("type" , "" ) == DatetimeBasedCursorModel .__name__ and self ._stream_supports_concurrent_partition_processing (declarative_stream = declarative_stream ) and hasattr (declarative_stream .retriever , "stream_slicer" ) and isinstance (declarative_stream .retriever .stream_slicer , DatetimeBasedCursor )
271+ def _is_datetime_incremental_with_partition_routing (
272+ self ,
273+ datetime_based_cursor_component_definition : Mapping [str , Any ],
274+ declarative_stream : DeclarativeStream ,
275+ ) -> bool :
276+ return (
277+ bool (datetime_based_cursor_component_definition )
278+ and datetime_based_cursor_component_definition .get ("type" , "" )
279+ == DatetimeBasedCursorModel .__name__
280+ and self ._stream_supports_concurrent_partition_processing (
281+ declarative_stream = declarative_stream
282+ )
283+ and hasattr (declarative_stream .retriever , "stream_slicer" )
284+ and isinstance (declarative_stream .retriever .stream_slicer , DatetimeBasedCursor )
285+ )
251286
252287 def _stream_supports_concurrent_partition_processing (
253288 self , declarative_stream : DeclarativeStream
0 commit comments