Skip to content

Commit b28c6e3

Browse files
authored
fix: query properties for custom requesters (#783)
1 parent 9b9f7bd commit b28c6e3

File tree

3 files changed

+130
-14
lines changed

3 files changed

+130
-14
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2403,21 +2403,12 @@ def create_http_requester(
24032403

24042404
api_budget = self._api_budget
24052405

2406-
# Removes QueryProperties components from the interpolated mappings because it has been designed
2407-
# to be used by the SimpleRetriever and will be resolved from the provider from the slice directly
2408-
# instead of through jinja interpolation
2409-
request_parameters: Optional[Union[str, Mapping[str, str]]]
2410-
if isinstance(model.request_parameters, Mapping):
2411-
request_parameters = self._remove_query_properties(model.request_parameters)
2412-
else:
2413-
request_parameters = model.request_parameters
2414-
24152406
request_options_provider = InterpolatedRequestOptionsProvider(
24162407
request_body=model.request_body,
24172408
request_body_data=model.request_body_data,
24182409
request_body_json=model.request_body_json,
24192410
request_headers=model.request_headers,
2420-
request_parameters=request_parameters,
2411+
request_parameters=model.request_parameters, # type: ignore # QueryProperties have been removed in `create_simple_retriever`
24212412
query_properties_key=query_properties_key,
24222413
config=config,
24232414
parameters=model.parameters or {},
@@ -3207,7 +3198,8 @@ def _get_url(req: Requester) -> str:
32073198

32083199
query_properties: Optional[QueryProperties] = None
32093200
query_properties_key: Optional[str] = None
3210-
if self._query_properties_in_request_parameters(model.requester):
3201+
self._ensure_query_properties_to_model(model.requester)
3202+
if self._has_query_properties_in_request_parameters(model.requester):
32113203
# It is better to be explicit about an error if PropertiesFromEndpoint is defined in multiple
32123204
# places instead of default to request_parameters which isn't clearly documented
32133205
if (
@@ -3219,7 +3211,7 @@ def _get_url(req: Requester) -> str:
32193211
)
32203212

32213213
query_properties_definitions = []
3222-
for key, request_parameter in model.requester.request_parameters.items(): # type: ignore # request_parameters is already validated to be a Mapping using _query_properties_in_request_parameters()
3214+
for key, request_parameter in model.requester.request_parameters.items(): # type: ignore # request_parameters is already validated to be a Mapping using _has_query_properties_in_request_parameters()
32233215
if isinstance(request_parameter, QueryPropertiesModel):
32243216
query_properties_key = key
32253217
query_properties_definitions.append(request_parameter)
@@ -3233,6 +3225,16 @@ def _get_url(req: Requester) -> str:
32333225
query_properties = self._create_component_from_model(
32343226
model=query_properties_definitions[0], config=config
32353227
)
3228+
3229+
# Removes QueryProperties components from the interpolated mappings because it has been designed
3230+
# to be used by the SimpleRetriever and will be resolved from the provider from the slice directly
3231+
# instead of through jinja interpolation
3232+
if hasattr(model.requester, "request_parameters") and isinstance(
3233+
model.requester.request_parameters, Mapping
3234+
):
3235+
model.requester.request_parameters = self._remove_query_properties(
3236+
model.requester.request_parameters
3237+
)
32363238
elif (
32373239
hasattr(model.requester, "fetch_properties_from_endpoint")
32383240
and model.requester.fetch_properties_from_endpoint
@@ -3369,7 +3371,7 @@ def _should_limit_slices_fetched(self) -> bool:
33693371
return bool(self._limit_slices_fetched or self._emit_connector_builder_messages)
33703372

33713373
@staticmethod
3372-
def _query_properties_in_request_parameters(
3374+
def _has_query_properties_in_request_parameters(
33733375
requester: Union[HttpRequesterModel, CustomRequesterModel],
33743376
) -> bool:
33753377
if not hasattr(requester, "request_parameters"):
@@ -4183,3 +4185,26 @@ def create_grouping_partition_router(
41834185
deduplicate=model.deduplicate if model.deduplicate is not None else True,
41844186
config=config,
41854187
)
4188+
4189+
def _ensure_query_properties_to_model(
4190+
self, requester: Union[HttpRequesterModel, CustomRequesterModel]
4191+
) -> None:
4192+
"""
4193+
For some reason, it seems like CustomRequesterModel request_parameters stays as dictionaries which means that
4194+
the other conditions relying on it being QueryPropertiesModel instead of a dict fail. Here, we migrate them to
4195+
proper model.
4196+
"""
4197+
if not hasattr(requester, "request_parameters"):
4198+
return
4199+
4200+
request_parameters = requester.request_parameters
4201+
if request_parameters and isinstance(request_parameters, Dict):
4202+
for request_parameter_key in request_parameters.keys():
4203+
request_parameter = request_parameters[request_parameter_key]
4204+
if (
4205+
isinstance(request_parameter, Dict)
4206+
and request_parameter.get("type") == "QueryProperties"
4207+
):
4208+
request_parameters[request_parameter_key] = QueryPropertiesModel.parse_obj(
4209+
request_parameter
4210+
)

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4+
import json
45
from copy import deepcopy
56

67
# mypy: ignore-errors
@@ -1189,6 +1190,73 @@ def test_incremental_stream_with_custom_retriever_and_partition_router():
11891190
)
11901191

11911192

1193+
def test_stream_with_custom_requester_and_query_properties(requests_mock):
1194+
content = """
1195+
a_stream:
1196+
type: DeclarativeStream
1197+
primary_key: id
1198+
schema_loader:
1199+
type: InlineSchemaLoader
1200+
schema:
1201+
$schema: "http://json-schema.org/draft-07/schema"
1202+
type: object
1203+
properties:
1204+
id:
1205+
type: string
1206+
retriever:
1207+
type: SimpleRetriever
1208+
name: "{{ parameters['name'] }}"
1209+
decoder:
1210+
type: JsonDecoder
1211+
requester:
1212+
type: CustomRequester
1213+
class_name: unit_tests.sources.declarative.parsers.testing_components.TestingRequester
1214+
name: "{{ parameters['name'] }}"
1215+
url_base: "https://api.sendgrid.com/v3/"
1216+
path: "path"
1217+
http_method: "GET"
1218+
request_parameters:
1219+
not_query: 1
1220+
query:
1221+
type: QueryProperties
1222+
property_list:
1223+
- id
1224+
- field
1225+
always_include_properties:
1226+
- id
1227+
property_chunking:
1228+
type: PropertyChunking
1229+
property_limit_type: property_count
1230+
property_limit: 18
1231+
record_selector:
1232+
type: RecordSelector
1233+
extractor:
1234+
type: DpathExtractor
1235+
field_path: ["records"]
1236+
$parameters:
1237+
name: a_stream
1238+
"""
1239+
1240+
parsed_manifest = YamlDeclarativeSource._parse(content)
1241+
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
1242+
stream_manifest = transformer.propagate_types_and_parameters(
1243+
"", resolved_manifest["a_stream"], {}
1244+
)
1245+
1246+
stream = factory.create_component(
1247+
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config
1248+
)
1249+
requests_mock.get(
1250+
"https://api.sendgrid.com/v3/path",
1251+
text=json.dumps({"records": [{"id": "1"}]}),
1252+
status_code=200,
1253+
)
1254+
1255+
x = list(next(stream.generate_partitions()).read())
1256+
1257+
assert len(x) == 1
1258+
1259+
11921260
@pytest.mark.parametrize(
11931261
"use_legacy_state",
11941262
[

unit_tests/sources/declarative/parsers/testing_components.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,18 @@
88
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
99
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
1010
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
11-
from airbyte_cdk.sources.declarative.requesters import RequestOption
11+
from airbyte_cdk.sources.declarative.requesters import HttpRequester, RequestOption
1212
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
1313
from airbyte_cdk.sources.declarative.requesters.paginators import (
1414
DefaultPaginator,
1515
PaginationStrategy,
1616
)
17+
from airbyte_cdk.sources.declarative.requesters.request_options import (
18+
InterpolatedRequestOptionsProvider,
19+
)
20+
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
21+
RequestInput,
22+
)
1723
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
1824

1925

@@ -82,3 +88,20 @@ def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
8288
"updated_at": "2024-02-01T00:00:00.000000+00:00"
8389
}
8490
return stream_state
91+
92+
93+
@dataclass
94+
class TestingRequester(HttpRequester):
95+
request_parameters: Optional[RequestInput] = None
96+
97+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
98+
"""
99+
Initializes the request options provider with the provided parameters and any
100+
configured request components like headers, parameters, or bodies.
101+
"""
102+
self.request_options_provider = InterpolatedRequestOptionsProvider(
103+
request_parameters=self.request_parameters,
104+
config=self.config,
105+
parameters=parameters or {},
106+
)
107+
super().__post_init__(parameters)

0 commit comments

Comments
 (0)