@@ -79,6 +79,9 @@ def __init__(
7979 emit_connector_builder_messages = emit_connector_builder_messages ,
8080 disable_resumable_full_refresh = True ,
8181 )
82+ self ._config = config
83+ self ._concurrent_streams : Optional [List [AbstractStream ]] = None
84+ self ._synchronous_streams : Optional [List [Stream ]] = None
8285
8386 super ().__init__ (
8487 source_config = source_config ,
@@ -89,21 +92,6 @@ def __init__(
8992
9093 self ._state = state
9194
92- self ._concurrent_streams : Optional [List [AbstractStream ]]
93- self ._synchronous_streams : Optional [List [Stream ]]
94-
95- # If the connector command was SPEC, there is no incoming config, and we cannot instantiate streams because
96- # they might depend on it. Ideally we want to have a static method on this class to get the spec without
97- # any other arguments, but the existing entrypoint.py isn't designed to support this. Just noting this
98- # for our future improvements to the CDK.
99- if config :
100- self ._concurrent_streams , self ._synchronous_streams = self ._group_streams (
101- config = config or {}
102- )
103- else :
104- self ._concurrent_streams = None
105- self ._synchronous_streams = None
106-
10795 concurrency_level_from_manifest = self ._source_config .get ("concurrency_level" )
10896 if concurrency_level_from_manifest :
10997 concurrency_level_component = self ._constructor .create_component (
@@ -132,6 +120,19 @@ def __init__(
132120 message_repository = self .message_repository , # type: ignore # message_repository is always instantiated with a value by factory
133121 )
134122
123+ def _actually_group (self ) -> None :
124+ # If the connector command was SPEC, there is no incoming config, and we cannot instantiate streams because
125+ # they might depend on it. Ideally we want to have a static method on this class to get the spec without
126+ # any other arguments, but the existing entrypoint.py isn't designed to support this. Just noting this
127+ # for our future improvements to the CDK.
128+ if self ._config :
129+ self ._concurrent_streams , self ._synchronous_streams = self ._group_streams (
130+ config = self ._config or {}
131+ )
132+ else :
133+ self ._concurrent_streams = None
134+ self ._synchronous_streams = None
135+
135136 def read (
136137 self ,
137138 logger : logging .Logger ,
@@ -141,6 +142,9 @@ def read(
141142 ) -> Iterator [AirbyteMessage ]:
142143 # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of the concurrent
143144 # streams must be saved so that they can be removed from the catalog before starting synchronous streams
145+ if self ._concurrent_streams is None :
146+ self ._actually_group ()
147+
144148 if self ._concurrent_streams :
145149 concurrent_stream_names = set (
146150 [concurrent_stream .name for concurrent_stream in self ._concurrent_streams ]
@@ -166,6 +170,9 @@ def read(
166170 yield from super ().read (logger , config , filtered_catalog , state )
167171
168172 def discover (self , logger : logging .Logger , config : Mapping [str , Any ]) -> AirbyteCatalog :
173+ if self ._concurrent_streams is None :
174+ self ._actually_group ()
175+
169176 concurrent_streams = self ._concurrent_streams or []
170177 synchronous_streams = self ._synchronous_streams or []
171178 return AirbyteCatalog (
0 commit comments