Skip to content

Commit 6ea924a

Browse files
maxi297octavia-squidington-iii
andauthored
feat: cache properties from endpoint (#808)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 26a9b98 commit 6ea924a

File tree

8 files changed

+108
-67
lines changed

8 files changed

+108
-67
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3353,7 +3353,7 @@ definitions:
33533353
- ["code", "type"]
33543354
PropertiesFromEndpoint:
33553355
title: Properties from Endpoint
3356-
description: Defines the behavior for fetching the list of properties from an API that will be loaded into the requests to extract records.
3356+
description: Defines the behavior for fetching the list of properties from an API that will be loaded into the requests to extract records. Note that stream_slices can't be interpolated from this retriever.
33573357
type: object
33583358
required:
33593359
- type

airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,27 @@ class PropertiesFromEndpoint:
2222
config: Config
2323
parameters: InitVar[Mapping[str, Any]]
2424

25+
_cached_properties: Optional[List[str]] = None
26+
2527
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
2628
self._property_field_path = [
2729
InterpolatedString(string=property_field, parameters=parameters)
2830
for property_field in self.property_field_path
2931
]
3032

31-
def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]:
32-
response_properties = self.retriever.read_records(
33-
records_schema={}, stream_slice=stream_slice
34-
)
35-
for property_obj in response_properties:
36-
path = [
37-
node.eval(self.config) if not isinstance(node, str) else node
38-
for node in self._property_field_path
39-
]
40-
yield dpath.get(property_obj, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
33+
def get_properties_from_endpoint(self) -> List[str]:
34+
if self._cached_properties is None:
35+
self._cached_properties = list(
36+
map(
37+
self._get_property, # type: ignore # SimpleRetriever and AsyncRetriever only returns Record. Should we change the return type of Retriever.read_records?
38+
self.retriever.read_records(records_schema={}, stream_slice=None),
39+
)
40+
)
41+
return self._cached_properties
42+
43+
def _get_property(self, property_obj: Mapping[str, Any]) -> str:
44+
path = [
45+
node.eval(self.config) if not isinstance(node, str) else node
46+
for node in self._property_field_path
47+
]
48+
return str(dpath.get(property_obj, path, default=[])) # type: ignore # extracted will be a MutableMapping, given input data structure

airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4141

4242
def get_request_property_chunks(
4343
self,
44-
property_fields: Iterable[str],
44+
property_fields: List[str],
4545
always_include_properties: Optional[List[str]],
4646
configured_properties: Optional[Set[str]],
4747
) -> Iterable[List[str]]:

airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,16 @@ class QueryProperties:
3030
config: Config
3131
parameters: InitVar[Mapping[str, Any]]
3232

33-
def get_request_property_chunks(
34-
self,
35-
stream_slice: Optional[StreamSlice] = None,
36-
) -> Iterable[List[str]]:
33+
def get_request_property_chunks(self) -> Iterable[List[str]]:
3734
"""
3835
Uses the defined property_list to fetch the total set of properties dynamically or from a static list
3936
and based on the resulting properties, performs property chunking if applicable.
40-
:param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included
41-
because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object
42-
:param configured_stream: The customer configured stream being synced which is needed to identify which
43-
record fields to query for and emit.
4437
"""
38+
fields: List[str]
4539
configured_properties = self.property_selector.select() if self.property_selector else None
4640

47-
fields: Union[Iterable[str], List[str]]
4841
if isinstance(self.property_list, PropertiesFromEndpoint):
49-
fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice)
42+
fields = self.property_list.get_properties_from_endpoint()
5043
else:
5144
fields = self.property_list if self.property_list else []
5245

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -385,9 +385,9 @@ def _read_pages(
385385
response = None
386386
try:
387387
if self.additional_query_properties:
388-
for properties in self.additional_query_properties.get_request_property_chunks(
389-
stream_slice=stream_slice,
390-
):
388+
for (
389+
properties
390+
) in self.additional_query_properties.get_request_property_chunks():
391391
stream_slice = StreamSlice(
392392
partition=stream_slice.partition or {},
393393
cursor_slice=stream_slice.cursor_slice or {},
@@ -523,7 +523,6 @@ def read_records(
523523
"""
524524
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check
525525

526-
most_recent_record_from_slice = None
527526
record_generator = partial(
528527
self._parse_records,
529528
stream_slice=stream_slice,

unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,7 @@ def test_get_properties_from_endpoint():
4444
parameters={},
4545
)
4646

47-
properties = list(
48-
properties_from_endpoint.get_properties_from_endpoint(
49-
stream_slice=StreamSlice(cursor_slice={}, partition={})
50-
)
51-
)
47+
properties = properties_from_endpoint.get_properties_from_endpoint()
5248

5349
assert len(properties) == 9
5450
assert properties == expected_properties
@@ -89,11 +85,7 @@ def test_get_properties_from_endpoint_with_multiple_field_paths():
8985
parameters={},
9086
)
9187

92-
properties = list(
93-
properties_from_endpoint.get_properties_from_endpoint(
94-
stream_slice=StreamSlice(cursor_slice={}, partition={})
95-
)
96-
)
88+
properties = properties_from_endpoint.get_properties_from_endpoint()
9789

9890
assert len(properties) == 9
9991
assert properties == expected_properties
@@ -135,11 +127,51 @@ def test_get_properties_from_endpoint_with_interpolation():
135127
parameters={},
136128
)
137129

138-
properties = list(
139-
properties_from_endpoint.get_properties_from_endpoint(
140-
stream_slice=StreamSlice(cursor_slice={}, partition={})
141-
)
142-
)
130+
properties = properties_from_endpoint.get_properties_from_endpoint()
143131

144132
assert len(properties) == 9
145133
assert properties == expected_properties
134+
135+
136+
def test_given_multiple_calls_when_get_properties_from_endpoint_then_only_call_retriever_once():
137+
retriever = Mock(spec=SimpleRetriever)
138+
retriever.read_records.return_value = iter(
139+
[
140+
Record(stream_name="players", data={"id": "ace", "value": 1}),
141+
]
142+
)
143+
144+
properties_from_endpoint = PropertiesFromEndpoint(
145+
retriever=retriever,
146+
property_field_path=["value"],
147+
config={},
148+
parameters={},
149+
)
150+
151+
properties_from_endpoint.get_properties_from_endpoint()
152+
properties_from_endpoint.get_properties_from_endpoint()
153+
properties_from_endpoint.get_properties_from_endpoint()
154+
155+
assert retriever.read_records.call_count == 1
156+
157+
158+
def test_given_value_is_int_when_get_properties_from_endpoint_then_return_str():
159+
expected_properties = ["1"]
160+
161+
retriever = Mock(spec=SimpleRetriever)
162+
retriever.read_records.return_value = iter(
163+
[
164+
Record(stream_name="players", data={"id": "ace", "value": 1}),
165+
]
166+
)
167+
168+
properties_from_endpoint = PropertiesFromEndpoint(
169+
retriever=retriever,
170+
property_field_path=["value"],
171+
config={},
172+
parameters={},
173+
)
174+
175+
properties = properties_from_endpoint.get_properties_from_endpoint()
176+
177+
assert properties == expected_properties

unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
from typing import Set
23

34
import pytest
45

@@ -73,7 +74,7 @@ def test_get_request_property_chunks(
7374
expected_property_chunks,
7475
):
7576
configured_properties = set(property_fields)
76-
property_fields = iter(property_fields)
77+
property_fields = property_fields
7778
property_chunking = PropertyChunking(
7879
property_limit_type=property_limit_type,
7980
property_limit=property_limit,
@@ -100,7 +101,7 @@ def test_get_request_property_chunks_empty_configured_properties():
100101

101102
always_include_properties = ["white", "lotus"]
102103
property_fields = ["maui", "taormina", "koh_samui", "saint_jean_cap_ferrat"]
103-
configured_properties = set()
104+
configured_properties: Set[str] = set()
104105
property_chunking = PropertyChunking(
105106
property_limit_type=PropertyLimitType.property_count,
106107
property_limit=3,
@@ -155,3 +156,27 @@ def test_get_merge_key():
155156

156157
merge_key = property_chunking.get_merge_key(record=record)
157158
assert merge_key == "0"
159+
160+
161+
def test_given_single_property_chunk_when_get_request_property_chunks_then_always_include_properties_are_not_added_to_input_list():
162+
"""
163+
This test is used to validate that we don't manipulate the in-memory values from get_request_property_chunks
164+
"""
165+
property_chunking = PropertyChunking(
166+
property_limit_type=PropertyLimitType.property_count,
167+
property_limit=None,
168+
record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}),
169+
config=CONFIG,
170+
parameters={},
171+
)
172+
173+
property_fields = ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"]
174+
list(
175+
property_chunking.get_request_property_chunks(
176+
property_fields=property_fields,
177+
always_include_properties=["id"],
178+
configured_properties=None,
179+
)
180+
)
181+
182+
assert property_fields == ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"]

unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ def test_get_request_property_chunks_static_list_with_chunking_property_selectio
5050
["santa", "clover", "junpei", "june", "remove_me"]
5151
)
5252

53-
stream_slice = StreamSlice(cursor_slice={}, partition={})
54-
5553
query_properties = QueryProperties(
5654
property_list=[
5755
"ace",
@@ -85,7 +83,7 @@ def test_get_request_property_chunks_static_list_with_chunking_property_selectio
8583
parameters={},
8684
)
8785

88-
property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
86+
property_chunks = list(query_properties.get_request_property_chunks())
8987

9088
assert len(property_chunks) == 2
9189
assert property_chunks[0] == ["santa", "clover", "junpei"]
@@ -107,8 +105,6 @@ def test_get_request_property_chunks_static_list_with_always_include_properties(
107105
]
108106
)
109107

110-
stream_slice = StreamSlice(cursor_slice={}, partition={})
111-
112108
query_properties = QueryProperties(
113109
property_list=[
114110
"ace",
@@ -141,7 +137,7 @@ def test_get_request_property_chunks_static_list_with_always_include_properties(
141137
parameters={},
142138
)
143139

144-
property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
140+
property_chunks = list(query_properties.get_request_property_chunks())
145141

146142
assert len(property_chunks) == 3
147143
assert property_chunks[0] == ["zero", "ace", "snake", "santa"]
@@ -154,8 +150,6 @@ def test_get_request_no_property_chunking_selected_properties_always_include_pro
154150
["santa", "clover", "junpei", "june", "remove_me"]
155151
)
156152

157-
stream_slice = StreamSlice(cursor_slice={}, partition={})
158-
159153
query_properties = QueryProperties(
160154
property_list=[
161155
"ace",
@@ -182,7 +176,7 @@ def test_get_request_no_property_chunking_selected_properties_always_include_pro
182176
parameters={},
183177
)
184178

185-
property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
179+
property_chunks = list(query_properties.get_request_property_chunks())
186180

187181
assert len(property_chunks) == 1
188182
assert property_chunks[0] == ["zero", "santa", "clover", "junpei", "june"]
@@ -203,8 +197,6 @@ def test_get_request_no_property_chunking_always_include_properties():
203197
]
204198
)
205199

206-
stream_slice = StreamSlice(cursor_slice={}, partition={})
207-
208200
query_properties = QueryProperties(
209201
property_list=[
210202
"ace",
@@ -231,7 +223,7 @@ def test_get_request_no_property_chunking_always_include_properties():
231223
parameters={},
232224
)
233225

234-
property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
226+
property_chunks = list(query_properties.get_request_property_chunks())
235227

236228
assert len(property_chunks) == 1
237229
assert property_chunks[0] == [
@@ -252,9 +244,6 @@ def test_get_request_property_chunks_dynamic_endpoint():
252244
configured_airbyte_stream = _create_configured_airbyte_stream(
253245
["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"]
254246
)
255-
256-
stream_slice = StreamSlice(cursor_slice={}, partition={})
257-
258247
properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint)
259248
properties_from_endpoint_mock.get_properties_from_endpoint.return_value = iter(
260249
["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"]
@@ -282,15 +271,14 @@ def test_get_request_property_chunks_dynamic_endpoint():
282271
parameters={},
283272
)
284273

285-
property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
274+
property_chunks = list(query_properties.get_request_property_chunks())
286275

287276
assert len(property_chunks) == 2
288277
assert property_chunks[0] == ["alice", "clover", "dio", "k", "luna"]
289278
assert property_chunks[1] == ["phi", "quark", "sigma", "tenmyouji"]
290279

291280

292281
def test_get_request_property_chunks_with_configured_catalog_static_list():
293-
stream_slice = StreamSlice(cursor_slice={}, partition={})
294282
# Simulate configured_airbyte_stream whose json_schema only enables 'luna', 'phi', 'sigma'
295283
configured_airbyte_stream = _create_configured_airbyte_stream(
296284
["santa", "clover", "junpei", "june"]
@@ -328,7 +316,7 @@ def test_get_request_property_chunks_with_configured_catalog_static_list():
328316
parameters={},
329317
)
330318

331-
property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
319+
property_chunks = list(query_properties.get_request_property_chunks())
332320

333321
assert len(property_chunks) == 2
334322
assert property_chunks[0] == ["santa", "clover", "junpei"]
@@ -340,8 +328,6 @@ def test_get_request_property_chunks_with_configured_catalog_dynamic_endpoint():
340328
["luna", "phi", "sigma", "remove_me"]
341329
)
342330

343-
stream_slice = StreamSlice(cursor_slice={}, partition={})
344-
345331
properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint)
346332
properties_from_endpoint_mock.get_properties_from_endpoint.return_value = iter(
347333
["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"]
@@ -369,15 +355,13 @@ def test_get_request_property_chunks_with_configured_catalog_dynamic_endpoint():
369355
parameters={},
370356
)
371357

372-
property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
358+
property_chunks = list(query_properties.get_request_property_chunks())
373359

374360
assert len(property_chunks) == 1
375361
assert property_chunks[0] == ["luna", "phi", "sigma"]
376362

377363

378364
def test_get_request_property_no_property_selection():
379-
stream_slice = StreamSlice(cursor_slice={}, partition={})
380-
381365
query_properties = QueryProperties(
382366
property_list=[
383367
"ace",
@@ -403,7 +387,7 @@ def test_get_request_property_no_property_selection():
403387
parameters={},
404388
)
405389

406-
property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
390+
property_chunks = list(query_properties.get_request_property_chunks())
407391

408392
assert len(property_chunks) == 3
409393
assert property_chunks[0] == ["ace", "snake", "santa"]

0 commit comments

Comments
 (0)