197197 ],
198198}
199199
200+ _MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM = {
201+ "version" : "6.7.0" ,
202+ "type" : "DeclarativeSource" ,
203+ "check" : {"type" : "CheckStream" , "stream_names" : ["Rates" ]},
204+ "dynamic_streams" : [
205+ {
206+ "type" : "DynamicDeclarativeStream" ,
207+ "stream_template" : {
208+ "type" : "DeclarativeStream" ,
209+ "name" : "" ,
210+ "primary_key" : [],
211+ "schema_loader" : {
212+ "type" : "InlineSchemaLoader" ,
213+ "schema" : {
214+ "$schema" : "http://json-schema.org/schema#" ,
215+ "properties" : {
216+ "ABC" : {"type" : "number" },
217+ "AED" : {"type" : "number" },
218+ },
219+ "type" : "object" ,
220+ },
221+ },
222+ "retriever" : {
223+ "type" : "SimpleRetriever" ,
224+ "requester" : {
225+ "type" : "HttpRequester" ,
226+ "url_base" : "https://api.test.com" ,
227+ "path" : "" ,
228+ "http_method" : "GET" ,
229+ "authenticator" : {
230+ "type" : "ApiKeyAuthenticator" ,
231+ "header" : "apikey" ,
232+ "api_token" : "{{ config['api_key'] }}" ,
233+ },
234+ },
235+ "record_selector" : {
236+ "type" : "RecordSelector" ,
237+ "extractor" : {"type" : "DpathExtractor" , "field_path" : []},
238+ },
239+ "paginator" : {"type" : "NoPagination" },
240+ },
241+ },
242+ "components_resolver" : {
243+ "type" : "HttpComponentsResolver" ,
244+ "retriever" : {
245+ "type" : "SimpleRetriever" ,
246+ "requester" : {
247+ "type" : "HttpRequester" ,
248+ "url_base" : "https://api.test.com" ,
249+ "path" : "parent/{{ stream_partition.parent_id }}/items" ,
250+ "http_method" : "GET" ,
251+ "authenticator" : {
252+ "type" : "ApiKeyAuthenticator" ,
253+ "header" : "apikey" ,
254+ "api_token" : "{{ config['api_key'] }}" ,
255+ },
256+ },
257+ "record_selector" : {
258+ "type" : "RecordSelector" ,
259+ "extractor" : {"type" : "DpathExtractor" , "field_path" : []},
260+ },
261+ "paginator" : {"type" : "NoPagination" },
262+ "partition_router" : {
263+ "type" : "SubstreamPartitionRouter" ,
264+ "parent_stream_configs" : [
265+ {
266+ "type" : "ParentStreamConfig" ,
267+ "parent_key" : "id" ,
268+ "partition_field" : "parent_id" ,
269+ "stream" : {
270+ "type" : "DeclarativeStream" ,
271+ "name" : "parent" ,
272+ "retriever" : {
273+ "type" : "SimpleRetriever" ,
274+ "requester" : {
275+ "type" : "HttpRequester" ,
276+ "url_base" : "https://api.test.com" ,
277+ "path" : "/parents" ,
278+ "http_method" : "GET" ,
279+ "authenticator" : {
280+ "type" : "ApiKeyAuthenticator" ,
281+ "header" : "apikey" ,
282+ "api_token" : "{{ config['api_key'] }}" ,
283+ },
284+ },
285+ "record_selector" : {
286+ "type" : "RecordSelector" ,
287+ "extractor" : {
288+ "type" : "DpathExtractor" ,
289+ "field_path" : [],
290+ },
291+ },
292+ },
293+ "schema_loader" : {
294+ "type" : "InlineSchemaLoader" ,
295+ "schema" : {
296+ "$schema" : "http://json-schema.org/schema#" ,
297+ "properties" : {"id" : {"type" : "integer" }},
298+ "type" : "object" ,
299+ },
300+ },
301+ },
302+ }
303+ ],
304+ },
305+ },
306+ "components_mapping" : [
307+ {
308+ "type" : "ComponentMappingDefinition" ,
309+ "field_path" : ["name" ],
310+ "value" : "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}" ,
311+ },
312+ {
313+ "type" : "ComponentMappingDefinition" ,
314+ "field_path" : [
315+ "retriever" ,
316+ "requester" ,
317+ "path" ,
318+ ],
319+ "value" : "{{ stream_slice['parent_id'] }}/{{ components_values['id'] }}" ,
320+ },
321+ ],
322+ },
323+ }
324+ ],
325+ }
326+
200327
201328@pytest .mark .parametrize (
202329 "components_mapping, retriever_data, stream_template_config, expected_result" ,
@@ -221,6 +348,44 @@ def test_http_components_resolver(
221348):
222349 mock_retriever = MagicMock ()
223350 mock_retriever .read_records .return_value = retriever_data
351+ mock_retriever .stream_slices .return_value = [{}]
352+ config = {}
353+
354+ resolver = HttpComponentsResolver (
355+ retriever = mock_retriever ,
356+ config = config ,
357+ components_mapping = components_mapping ,
358+ parameters = {},
359+ )
360+
361+ result = list (resolver .resolve_components (stream_template_config = stream_template_config ))
362+ assert result == expected_result
363+
364+
365+ @pytest .mark .parametrize (
366+ "components_mapping, retriever_data, stream_template_config, expected_result" ,
367+ [
368+ (
369+ [
370+ ComponentMappingDefinition (
371+ field_path = [InterpolatedString .create ("path" , parameters = {})],
372+ value = "{{stream_slice['parent_id']}}/{{components_values['id']}}" ,
373+ value_type = str ,
374+ parameters = {},
375+ )
376+ ],
377+ [{"id" : "1" , "field1" : "data1" }, {"id" : "2" , "field1" : "data2" }],
378+ {"path" : None },
379+ [{"path" : "1/1" }, {"path" : "1/2" }, {"path" : "2/1" }, {"path" : "2/2" }],
380+ )
381+ ],
382+ )
383+ def test_http_components_resolver_with_stream_slices (
384+ components_mapping , retriever_data , stream_template_config , expected_result
385+ ):
386+ mock_retriever = MagicMock ()
387+ mock_retriever .read_records .return_value = retriever_data
388+ mock_retriever .stream_slices .return_value = [{"parent_id" : 1 }, {"parent_id" : 2 }]
224389 config = {}
225390
226391 resolver = HttpComponentsResolver (
@@ -305,3 +470,66 @@ def test_duplicated_dynamic_streams_read_with_http_components_resolver():
305470 str (exc_info .value )
306471 == "Dynamic streams list contains a duplicate name: item_2. Please contact Airbyte Support."
307472 )
473+
474+
475+ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_stream ():
476+ expected_stream_names = [
477+ "parent_1_item_1" ,
478+ "parent_1_item_2" ,
479+ "parent_2_item_1" ,
480+ "parent_2_item_2" ,
481+ ]
482+ with HttpMocker () as http_mocker :
483+ http_mocker .get (
484+ HttpRequest (url = "https://api.test.com/parents" ),
485+ HttpResponse (body = json .dumps ([{"id" : 1 }, {"id" : 2 }])),
486+ )
487+ parent_ids = [1 , 2 ]
488+ for parent_id in parent_ids :
489+ http_mocker .get (
490+ HttpRequest (url = f"https://api.test.com/parent/{ parent_id } /items" ),
491+ HttpResponse (
492+ body = json .dumps (
493+ [
494+ {"id" : 1 , "name" : "item_1" },
495+ {"id" : 2 , "name" : "item_2" },
496+ ]
497+ )
498+ ),
499+ )
500+ dynamic_stream_paths = ["1/1" , "2/1" , "1/2" , "2/2" ]
501+ for dynamic_stream_path in dynamic_stream_paths :
502+ http_mocker .get (
503+ HttpRequest (url = f"https://api.test.com/{ dynamic_stream_path } " ),
504+ HttpResponse (body = json .dumps ([{"ABC" : 1 , "AED" : 2 }])),
505+ )
506+
507+ source = ConcurrentDeclarativeSource (
508+ source_config = _MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM ,
509+ config = _CONFIG ,
510+ catalog = None ,
511+ state = None ,
512+ )
513+
514+ actual_catalog = source .discover (logger = source .logger , config = _CONFIG )
515+
516+ configured_streams = [
517+ to_configured_stream (stream , primary_key = stream .source_defined_primary_key )
518+ for stream in actual_catalog .streams
519+ ]
520+ configured_catalog = to_configured_catalog (configured_streams )
521+
522+ records = [
523+ message .record
524+ for message in source .read (MagicMock (), _CONFIG , configured_catalog )
525+ if message .type == Type .RECORD
526+ ]
527+
528+ assert len (actual_catalog .streams ) == 4
529+ assert [stream .name for stream in actual_catalog .streams ] == expected_stream_names
530+ assert len (records ) == 4
531+
532+ actual_record_stream_names = [record .stream for record in records ]
533+ actual_record_stream_names .sort ()
534+
535+ assert actual_record_stream_names == expected_stream_names
0 commit comments