@@ -206,7 +206,7 @@ def _group_streams(
206206 # so we need to treat them as synchronous
207207 if (
208208 isinstance (declarative_stream , DeclarativeStream )
209- and name_to_stream_mapping [declarative_stream .name ]. get ( "retriever" ) ["type" ]
209+ and name_to_stream_mapping [declarative_stream .name ][ "retriever" ] ["type" ]
210210 == "SimpleRetriever"
211211 ):
212212 incremental_sync_component_definition = name_to_stream_mapping [
@@ -215,7 +215,7 @@ def _group_streams(
215215
216216 partition_router_component_definition = (
217217 name_to_stream_mapping [declarative_stream .name ]
218- .get ("retriever" )
218+ .get ("retriever" , {} )
219219 .get ("partition_router" )
220220 )
221221 is_without_partition_router_or_cursor = not bool (
@@ -237,7 +237,7 @@ def _group_streams(
237237 cursor = self ._constructor .create_concurrent_cursor_from_datetime_based_cursor (
238238 state_manager = state_manager ,
239239 model_type = DatetimeBasedCursorModel ,
240- component_definition = incremental_sync_component_definition ,
240+ component_definition = incremental_sync_component_definition , # type: ignore # Not None because of the if condition above
241241 stream_name = declarative_stream .name ,
242242 stream_namespace = declarative_stream .namespace ,
243243 config = config or {},
@@ -320,10 +320,11 @@ def _group_streams(
320320 def _is_datetime_incremental_without_partition_routing (
321321 self ,
322322 declarative_stream : DeclarativeStream ,
323- incremental_sync_component_definition : Mapping [str , Any ],
323+ incremental_sync_component_definition : Mapping [str , Any ] | None ,
324324 ) -> bool :
325325 return (
326- bool (incremental_sync_component_definition )
326+ incremental_sync_component_definition is not None
327+ and bool (incremental_sync_component_definition )
327328 and incremental_sync_component_definition .get ("type" , "" )
328329 == DatetimeBasedCursorModel .__name__
329330 and self ._stream_supports_concurrent_partition_processing (
0 commit comments