Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.retrievers import Retriever
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.types import Config, StreamSlice


Expand All @@ -22,19 +23,27 @@ class PropertiesFromEndpoint:
config: Config
parameters: InitVar[Mapping[str, Any]]

_cached_properties: Optional[List[str]] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._property_field_path = [
InterpolatedString(string=property_field, parameters=parameters)
for property_field in self.property_field_path
]

def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]:
response_properties = self.retriever.read_records(
records_schema={}, stream_slice=stream_slice
)
for property_obj in response_properties:
path = [
node.eval(self.config) if not isinstance(node, str) else node
for node in self._property_field_path
]
yield dpath.get(property_obj, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> List[str]:
if self._cached_properties is None:
self._cached_properties = list(
map(
self._get_property, # type: ignore # SimpleRetriever and AsyncRetriever only returns Record. Should we change the return type of Retriever.read_records?
self.retriever.read_records(records_schema={}, stream_slice=stream_slice),
)
)
return self._cached_properties

def _get_property(self, property_obj: Mapping[str, Any]) -> str:
path = [
node.eval(self.config) if not isinstance(node, str) else node
for node in self._property_field_path
]
return str(dpath.get(property_obj, path, default=[])) # type: ignore # extracted will be a MutableMapping, given input data structure
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
)

def get_request_property_chunks(
self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]
self, property_fields: List[str], always_include_properties: Optional[List[str]]
) -> Iterable[List[str]]:
if not self.property_limit:
single_property_chunk = list(property_fields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def get_request_property_chunks(
:param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included
because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object
"""
fields: Union[Iterable[str], List[str]]
fields: List[str]
if isinstance(self.property_list, PropertiesFromEndpoint):
fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,6 @@ def read_records(
"""
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check

most_recent_record_from_slice = None
record_generator = partial(
self._parse_records,
stream_slice=stream_slice,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ def test_get_properties_from_endpoint():
parameters={},
)

properties = list(
properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)
properties = properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)

assert len(properties) == 9
Expand Down Expand Up @@ -89,10 +87,8 @@ def test_get_properties_from_endpoint_with_multiple_field_paths():
parameters={},
)

properties = list(
properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)
properties = properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)

assert len(properties) == 9
Expand Down Expand Up @@ -135,11 +131,61 @@ def test_get_properties_from_endpoint_with_interpolation():
parameters={},
)

properties = list(
properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)
properties = properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)

assert len(properties) == 9
assert properties == expected_properties


def test_given_multiple_calls_when_get_properties_from_endpoint_then_only_call_retriever_once():
retriever = Mock(spec=SimpleRetriever)
retriever.read_records.return_value = iter(
[
Record(stream_name="players", data={"id": "ace", "value": 1}),
]
)

properties_from_endpoint = PropertiesFromEndpoint(
retriever=retriever,
property_field_path=["value"],
config={},
parameters={},
)

properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)
properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)
properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)

assert retriever.read_records.call_count == 1


def test_given_value_is_int_when_get_properties_from_endpoint_then_return_str():
expected_properties = ["1"]

retriever = Mock(spec=SimpleRetriever)
retriever.read_records.return_value = iter(
[
Record(stream_name="players", data={"id": "ace", "value": 1}),
]
)

properties_from_endpoint = PropertiesFromEndpoint(
retriever=retriever,
property_field_path=["value"],
config={},
parameters={},
)

properties = properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)

assert properties == expected_properties
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def test_get_request_property_chunks(
property_limit,
expected_property_chunks,
):
property_fields = iter(property_fields)
property_chunking = PropertyChunking(
property_limit_type=property_limit_type,
property_limit=property_limit,
Expand Down Expand Up @@ -104,3 +103,26 @@ def test_get_merge_key():

merge_key = property_chunking.get_merge_key(record=record)
assert merge_key == "0"


def test_given_single_property_chunk_when_get_request_property_chunks_then_always_include_properties_are_not_added_to_input_list():
"""
This test is used to validate that we don't manipulate the in-memory values from get_request_property_chunks
"""
property_chunking = PropertyChunking(
property_limit_type=PropertyLimitType.property_count,
property_limit=None,
record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}),
config=CONFIG,
parameters={},
)

property_fields = ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"]
list(
property_chunking.get_request_property_chunks(
property_fields=property_fields,
always_include_properties=["id"],
)
)

assert property_fields == ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"]
Loading