1414 INJECTED_COMPONENTS_PY ,
1515 INJECTED_COMPONENTS_PY_CHECKSUMS ,
1616)
17+ from airbyte_cdk .utils .airbyte_secrets_utils import filter_secrets
1718
1819from ..api_models import (
1920 CheckRequest ,
2021 CheckResponse ,
2122 DiscoverRequest ,
2223 DiscoverResponse ,
24+ ErrorResponse ,
2325 FullResolveRequest ,
2426 Manifest ,
2527 ManifestResponse ,
@@ -64,7 +66,13 @@ def safe_build_source(
6466)
6567
6668
67- @router .post ("/test_read" , operation_id = "testRead" )
69+ @router .post (
70+ "/test_read" ,
71+ operation_id = "testRead" ,
72+ responses = {
73+ 400 : {"description" : "Bad Request - Error processing request" , "model" : ErrorResponse }
74+ },
75+ )
6876def test_read (request : StreamTestReadRequest ) -> StreamReadResponse :
6977 """
7078 Test reading from a specific stream in the manifest.
@@ -87,8 +95,19 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
8795 "md5" : hashlib .md5 (request .custom_components_code .encode ()).hexdigest ()
8896 }
8997
98+ # We enforce a concurrency level of 1 so that the stream is processed on a single thread
99+ # to retain ordering for the grouping of the builder message responses.
100+ manifest = request .manifest .model_dump ()
101+ if "concurrency_level" in manifest :
102+ manifest ["concurrency_level" ]["default_concurrency" ] = 1
103+ else :
104+ manifest ["concurrency_level" ] = {
105+ "type" : "ConcurrencyLevel" ,
106+ "default_concurrency" : 1 ,
107+ }
108+
90109 source = safe_build_source (
91- request . manifest . model_dump () ,
110+ manifest ,
92111 config_dict ,
93112 catalog ,
94113 converted_state ,
@@ -98,18 +117,29 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
98117 )
99118
100119 runner = ManifestCommandProcessor (source )
101- cdk_result = runner .test_read (
102- config_dict ,
103- catalog ,
104- converted_state ,
105- request .record_limit ,
106- request .page_limit ,
107- request .slice_limit ,
108- )
109- return StreamReadResponse .model_validate (asdict (cdk_result ))
120+ try :
121+ cdk_result = runner .test_read (
122+ config_dict ,
123+ catalog ,
124+ converted_state ,
125+ request .record_limit ,
126+ request .page_limit ,
127+ request .slice_limit ,
128+ )
129+ return StreamReadResponse .model_validate (asdict (cdk_result ))
130+ except Exception as exc :
131+ # Filter secrets from error message before returning to client
132+ sanitized_message = filter_secrets (f"Error reading stream: { str (exc )} " )
133+ raise HTTPException (status_code = 400 , detail = sanitized_message )
110134
111135
112- @router .post ("/check" , operation_id = "check" )
136+ @router .post (
137+ "/check" ,
138+ operation_id = "check" ,
139+ responses = {
140+ 400 : {"description" : "Bad Request - Error processing request" , "model" : ErrorResponse }
141+ },
142+ )
113143def check (request : CheckRequest ) -> CheckResponse :
114144 """Check configuration against a manifest"""
115145 # Apply trace tags from context if provided
@@ -119,13 +149,24 @@ def check(request: CheckRequest) -> CheckResponse:
119149 project_id = request .context .project_id ,
120150 )
121151
122- source = safe_build_source (request .manifest .model_dump (), request .config .model_dump ())
123- runner = ManifestCommandProcessor (source )
124- success , message = runner .check_connection (request .config .model_dump ())
125- return CheckResponse (success = success , message = message )
152+ try :
153+ source = safe_build_source (request .manifest .model_dump (), request .config .model_dump ())
154+ runner = ManifestCommandProcessor (source )
155+ success , message = runner .check_connection (request .config .model_dump ())
156+ return CheckResponse (success = success , message = message )
157+ except Exception as exc :
158+ # Filter secrets from error message before returning to client
159+ sanitized_message = filter_secrets (f"Error checking connection: { str (exc )} " )
160+ raise HTTPException (status_code = 400 , detail = sanitized_message )
126161
127162
128- @router .post ("/discover" , operation_id = "discover" )
163+ @router .post (
164+ "/discover" ,
165+ operation_id = "discover" ,
166+ responses = {
167+ 400 : {"description" : "Bad Request - Error processing request" , "model" : ErrorResponse }
168+ },
169+ )
129170def discover (request : DiscoverRequest ) -> DiscoverResponse :
130171 """Discover streams from a manifest"""
131172 # Apply trace tags from context if provided
@@ -135,15 +176,31 @@ def discover(request: DiscoverRequest) -> DiscoverResponse:
135176 project_id = request .context .project_id ,
136177 )
137178
138- source = safe_build_source (request .manifest .model_dump (), request .config .model_dump ())
139- runner = ManifestCommandProcessor (source )
140- catalog = runner .discover (request .config .model_dump ())
141- if catalog is None :
142- raise HTTPException (status_code = 422 , detail = "Connector did not return a discovered catalog" )
143- return DiscoverResponse (catalog = catalog )
179+ try :
180+ source = safe_build_source (request .manifest .model_dump (), request .config .model_dump ())
181+ runner = ManifestCommandProcessor (source )
182+ catalog = runner .discover (request .config .model_dump ())
183+ if catalog is None :
184+ raise HTTPException (
185+ status_code = 422 , detail = "Connector did not return a discovered catalog"
186+ )
187+ return DiscoverResponse (catalog = catalog )
188+ except HTTPException :
189+ # Re-raise HTTPExceptions as-is (like the catalog None check above)
190+ raise
191+ except Exception as exc :
192+ # Filter secrets from error message before returning to client
193+ sanitized_message = filter_secrets (f"Error discovering streams: { str (exc )} " )
194+ raise HTTPException (status_code = 400 , detail = sanitized_message )
144195
145196
146- @router .post ("/resolve" , operation_id = "resolve" )
197+ @router .post (
198+ "/resolve" ,
199+ operation_id = "resolve" ,
200+ responses = {
201+ 400 : {"description" : "Bad Request - Error processing request" , "model" : ErrorResponse }
202+ },
203+ )
147204def resolve (request : ResolveRequest ) -> ManifestResponse :
148205 """Resolve a manifest to its final configuration."""
149206 # Apply trace tags from context if provided
@@ -153,11 +210,22 @@ def resolve(request: ResolveRequest) -> ManifestResponse:
153210 project_id = request .context .project_id ,
154211 )
155212
156- source = safe_build_source (request .manifest .model_dump (), {})
157- return ManifestResponse (manifest = Manifest (** source .resolved_manifest ))
213+ try :
214+ source = safe_build_source (request .manifest .model_dump (), {})
215+ return ManifestResponse (manifest = Manifest (** source .resolved_manifest ))
216+ except Exception as exc :
217+ # Filter secrets from error message before returning to client
218+ sanitized_message = filter_secrets (f"Error resolving manifest: { str (exc )} " )
219+ raise HTTPException (status_code = 400 , detail = sanitized_message )
158220
159221
160- @router .post ("/full_resolve" , operation_id = "fullResolve" )
222+ @router .post (
223+ "/full_resolve" ,
224+ operation_id = "fullResolve" ,
225+ responses = {
226+ 400 : {"description" : "Bad Request - Error processing request" , "model" : ErrorResponse }
227+ },
228+ )
161229def full_resolve (request : FullResolveRequest ) -> ManifestResponse :
162230 """
163231 Fully resolve a manifest, including dynamic streams.
@@ -171,21 +239,26 @@ def full_resolve(request: FullResolveRequest) -> ManifestResponse:
171239 project_id = request .context .project_id ,
172240 )
173241
174- source = safe_build_source (request .manifest .model_dump (), request .config .model_dump ())
175- manifest = {** source .resolved_manifest }
176- streams = manifest .get ("streams" , [])
177- for stream in streams :
178- stream ["dynamic_stream_name" ] = None
242+ try :
243+ source = safe_build_source (request .manifest .model_dump (), request .config .model_dump ())
244+ manifest = {** source .resolved_manifest }
245+ streams = manifest .get ("streams" , [])
246+ for stream in streams :
247+ stream ["dynamic_stream_name" ] = None
179248
180- mapped_streams : Dict [str , List [Dict [str , Any ]]] = {}
181- for stream in source .dynamic_streams :
182- generated_streams = mapped_streams .setdefault (stream ["dynamic_stream_name" ], [])
249+ mapped_streams : Dict [str , List [Dict [str , Any ]]] = {}
250+ for stream in source .dynamic_streams :
251+ generated_streams = mapped_streams .setdefault (stream ["dynamic_stream_name" ], [])
183252
184- if len (generated_streams ) < request .stream_limit :
185- generated_streams += [stream ]
253+ if len (generated_streams ) < request .stream_limit :
254+ generated_streams += [stream ]
186255
187- for generated_streams_list in mapped_streams .values ():
188- streams .extend (generated_streams_list )
256+ for generated_streams_list in mapped_streams .values ():
257+ streams .extend (generated_streams_list )
189258
190- manifest ["streams" ] = streams
191- return ManifestResponse (manifest = Manifest (** manifest ))
259+ manifest ["streams" ] = streams
260+ return ManifestResponse (manifest = Manifest (** manifest ))
261+ except Exception as exc :
262+ # Filter secrets from error message before returning to client
263+ sanitized_message = filter_secrets (f"Error full resolving manifest: { str (exc )} " )
264+ raise HTTPException (status_code = 400 , detail = sanitized_message )
0 commit comments